1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
44
from bzrlib.smart import client, protocol
45
from bzrlib.transport import ssh
49
# We must not read any more than 64k at a time so we don't risk "no buffer
50
# space available" errors on some platforms. Windows in particular is likely
51
# to give error 10053 or 10055 if we read more than 64k from a socket.
52
_MAX_READ_SIZE = 64 * 1024
55
def _get_protocol_factory_for_bytes(bytes):
56
"""Determine the right protocol factory for 'bytes'.
58
This will return an appropriate protocol factory depending on the version
59
of the protocol being used, as determined by inspecting the given bytes.
60
The bytes should have at least one newline byte (i.e. be a whole line),
61
otherwise it's possible that a request will be incorrectly identified as
64
Typical use would be::
66
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
67
server_protocol = factory(transport, write_func, root_client_path)
68
server_protocol.accept_bytes(unused_bytes)
70
:param bytes: a str of bytes of the start of the request.
71
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
72
a callable that takes three args: transport, write_func,
73
root_client_path. unused_bytes are any bytes that were not part of a
74
protocol version marker.
76
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
77
protocol_factory = protocol.build_server_protocol_three
78
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
79
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
80
protocol_factory = protocol.SmartServerRequestProtocolTwo
81
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
83
protocol_factory = protocol.SmartServerRequestProtocolOne
84
return protocol_factory, bytes
87
def _get_line(read_bytes_func):
88
"""Read bytes using read_bytes_func until a newline byte.
90
This isn't particularly efficient, so should only be used when the
91
expected size of the line is quite short.
93
:returns: a tuple of two strs: (line, excess)
97
while newline_pos == -1:
98
new_bytes = read_bytes_func(1)
101
# Ran out of bytes before receiving a complete line.
103
newline_pos = bytes.find('\n')
104
line = bytes[:newline_pos+1]
105
excess = bytes[newline_pos+1:]
109
class SmartMedium(object):
110
"""Base class for smart protocol media, both client- and server-side."""
113
self._push_back_buffer = None
115
def _push_back(self, bytes):
116
"""Return unused bytes to the medium, because they belong to the next
119
This sets the _push_back_buffer to the given bytes.
121
if self._push_back_buffer is not None:
122
raise AssertionError(
123
"_push_back called when self._push_back_buffer is %r"
124
% (self._push_back_buffer,))
127
self._push_back_buffer = bytes
129
def _get_push_back_buffer(self):
130
if self._push_back_buffer == '':
131
raise AssertionError(
132
'%s._push_back_buffer should never be the empty string, '
133
'which can be confused with EOF' % (self,))
134
bytes = self._push_back_buffer
135
self._push_back_buffer = None
138
def read_bytes(self, desired_count):
139
"""Read some bytes from this medium.
141
:returns: some bytes, possibly more or less than the number requested
142
in 'desired_count' depending on the medium.
144
if self._push_back_buffer is not None:
145
return self._get_push_back_buffer()
146
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
147
return self._read_bytes(bytes_to_read)
149
def _read_bytes(self, count):
150
raise NotImplementedError(self._read_bytes)
153
"""Read bytes from this request's response until a newline byte.
155
This isn't particularly efficient, so should only be used when the
156
expected size of the line is quite short.
158
:returns: a string of bytes ending in a newline (byte 0x0A).
160
line, excess = _get_line(self.read_bytes)
161
self._push_back(excess)
165
class SmartServerStreamMedium(SmartMedium):
166
"""Handles smart commands coming over a stream.
168
The stream may be a pipe connected to sshd, or a tcp socket, or an
169
in-process fifo for testing.
171
One instance is created for each connected client; it can serve multiple
172
requests in the lifetime of the connection.
174
The server passes requests through to an underlying backing transport,
175
which will typically be a LocalTransport looking at the server's filesystem.
177
:ivar _push_back_buffer: a str of bytes that have been read from the stream
178
but not used yet, or None if there are no buffered bytes. Subclasses
179
should make sure to exhaust this buffer before reading more bytes from
180
the stream. See also the _push_back method.
183
def __init__(self, backing_transport, root_client_path='/'):
184
"""Construct new server.
186
:param backing_transport: Transport for the directory served.
188
# backing_transport could be passed to serve instead of __init__
189
self.backing_transport = backing_transport
190
self.root_client_path = root_client_path
191
self.finished = False
192
SmartMedium.__init__(self)
195
"""Serve requests until the client disconnects."""
196
# Keep a reference to stderr because the sys module's globals get set to
197
# None during interpreter shutdown.
198
from sys import stderr
200
while not self.finished:
201
server_protocol = self._build_protocol()
202
self._serve_one_request(server_protocol)
204
stderr.write("%s terminating on exception %s\n" % (self, e))
207
def _build_protocol(self):
208
"""Identifies the version of the incoming request, and returns an
209
a protocol object that can interpret it.
211
If more bytes than the version prefix of the request are read, they will
212
be fed into the protocol before it is returned.
214
:returns: a SmartServerRequestProtocol.
216
bytes = self._get_line()
217
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
218
protocol = protocol_factory(
219
self.backing_transport, self._write_out, self.root_client_path)
220
protocol.accept_bytes(unused_bytes)
223
def _serve_one_request(self, protocol):
224
"""Read one request from input, process, send back a response.
226
:param protocol: a SmartServerRequestProtocol.
229
self._serve_one_request_unguarded(protocol)
230
except KeyboardInterrupt:
233
self.terminate_due_to_error()
235
def terminate_due_to_error(self):
236
"""Called when an unhandled exception from the protocol occurs."""
237
raise NotImplementedError(self.terminate_due_to_error)
239
def _read_bytes(self, desired_count):
240
"""Get some bytes from the medium.
242
:param desired_count: number of bytes we want to read.
244
raise NotImplementedError(self._read_bytes)
247
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
249
def __init__(self, sock, backing_transport, root_client_path='/'):
252
:param sock: the socket the server will read from. It will be put
255
SmartServerStreamMedium.__init__(
256
self, backing_transport, root_client_path=root_client_path)
257
sock.setblocking(True)
260
def _serve_one_request_unguarded(self, protocol):
261
while protocol.next_read_size():
262
# We can safely try to read large chunks. If there is less data
263
# than _MAX_READ_SIZE ready, the socket wil just return a short
264
# read immediately rather than block.
265
bytes = self.read_bytes(_MAX_READ_SIZE)
269
protocol.accept_bytes(bytes)
271
self._push_back(protocol.unused_data)
273
def _read_bytes(self, desired_count):
274
# We ignore the desired_count because on sockets it's more efficient to
275
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
276
return self.socket.recv(_MAX_READ_SIZE)
278
def terminate_due_to_error(self):
279
# TODO: This should log to a server log file, but no such thing
280
# exists yet. Andrew Bennetts 2006-09-29.
284
def _write_out(self, bytes):
285
osutils.send_all(self.socket, bytes)
288
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
290
def __init__(self, in_file, out_file, backing_transport):
291
"""Construct new server.
293
:param in_file: Python file from which requests can be read.
294
:param out_file: Python file to write responses.
295
:param backing_transport: Transport for the directory served.
297
SmartServerStreamMedium.__init__(self, backing_transport)
298
if sys.platform == 'win32':
299
# force binary mode for files
301
for f in (in_file, out_file):
302
fileno = getattr(f, 'fileno', None)
304
msvcrt.setmode(fileno(), os.O_BINARY)
308
def _serve_one_request_unguarded(self, protocol):
310
# We need to be careful not to read past the end of the current
311
# request, or else the read from the pipe will block, so we use
312
# protocol.next_read_size().
313
bytes_to_read = protocol.next_read_size()
314
if bytes_to_read == 0:
315
# Finished serving this request.
318
bytes = self.read_bytes(bytes_to_read)
320
# Connection has been closed.
324
protocol.accept_bytes(bytes)
326
def _read_bytes(self, desired_count):
327
return self._in.read(desired_count)
329
def terminate_due_to_error(self):
330
# TODO: This should log to a server log file, but no such thing
331
# exists yet. Andrew Bennetts 2006-09-29.
335
def _write_out(self, bytes):
336
self._out.write(bytes)
339
class SmartClientMediumRequest(object):
340
"""A request on a SmartClientMedium.
342
Each request allows bytes to be provided to it via accept_bytes, and then
343
the response bytes to be read via read_bytes.
346
request.accept_bytes('123')
347
request.finished_writing()
348
result = request.read_bytes(3)
349
request.finished_reading()
351
It is up to the individual SmartClientMedium whether multiple concurrent
352
requests can exist. See SmartClientMedium.get_request to obtain instances
353
of SmartClientMediumRequest, and the concrete Medium you are using for
354
details on concurrency and pipelining.
357
def __init__(self, medium):
358
"""Construct a SmartClientMediumRequest for the medium medium."""
359
self._medium = medium
360
# we track state by constants - we may want to use the same
361
# pattern as BodyReader if it gets more complex.
362
# valid states are: "writing", "reading", "done"
363
self._state = "writing"
365
def accept_bytes(self, bytes):
366
"""Accept bytes for inclusion in this request.
368
This method may not be be called after finished_writing() has been
369
called. It depends upon the Medium whether or not the bytes will be
370
immediately transmitted. Message based Mediums will tend to buffer the
371
bytes until finished_writing() is called.
373
:param bytes: A bytestring.
375
if self._state != "writing":
376
raise errors.WritingCompleted(self)
377
self._accept_bytes(bytes)
379
def _accept_bytes(self, bytes):
380
"""Helper for accept_bytes.
382
Accept_bytes checks the state of the request to determing if bytes
383
should be accepted. After that it hands off to _accept_bytes to do the
386
raise NotImplementedError(self._accept_bytes)
388
def finished_reading(self):
389
"""Inform the request that all desired data has been read.
391
This will remove the request from the pipeline for its medium (if the
392
medium supports pipelining) and any further calls to methods on the
393
request will raise ReadingCompleted.
395
if self._state == "writing":
396
raise errors.WritingNotComplete(self)
397
if self._state != "reading":
398
raise errors.ReadingCompleted(self)
400
self._finished_reading()
402
def _finished_reading(self):
403
"""Helper for finished_reading.
405
finished_reading checks the state of the request to determine if
406
finished_reading is allowed, and if it is hands off to _finished_reading
407
to perform the action.
409
raise NotImplementedError(self._finished_reading)
411
def finished_writing(self):
412
"""Finish the writing phase of this request.
414
This will flush all pending data for this request along the medium.
415
After calling finished_writing, you may not call accept_bytes anymore.
417
if self._state != "writing":
418
raise errors.WritingCompleted(self)
419
self._state = "reading"
420
self._finished_writing()
422
def _finished_writing(self):
423
"""Helper for finished_writing.
425
finished_writing checks the state of the request to determine if
426
finished_writing is allowed, and if it is hands off to _finished_writing
427
to perform the action.
429
raise NotImplementedError(self._finished_writing)
431
def read_bytes(self, count):
432
"""Read bytes from this requests response.
434
This method will block and wait for count bytes to be read. It may not
435
be invoked until finished_writing() has been called - this is to ensure
436
a message-based approach to requests, for compatibility with message
437
based mediums like HTTP.
439
if self._state == "writing":
440
raise errors.WritingNotComplete(self)
441
if self._state != "reading":
442
raise errors.ReadingCompleted(self)
443
return self._read_bytes(count)
445
def _read_bytes(self, count):
446
"""Helper for SmartClientMediumRequest.read_bytes.
448
read_bytes checks the state of the request to determing if bytes
449
should be read. After that it hands off to _read_bytes to do the
452
By default this forwards to self._medium.read_bytes because we are
453
operating on the medium's stream.
455
return self._medium.read_bytes(count)
458
line = self._read_line()
459
if not line.endswith('\n'):
460
# end of file encountered reading from server
461
raise errors.ConnectionReset(
462
"please check connectivity and permissions",
463
"(and try -Dhpss if further diagnosis is required)")
466
def _read_line(self):
467
"""Helper for SmartClientMediumRequest.read_line.
469
By default this forwards to self._medium._get_line because we are
470
operating on the medium's stream.
472
return self._medium._get_line()
475
class _DebugCounter(object):
476
"""An object that counts the HPSS calls made to each client medium.
478
When a medium is garbage-collected, or failing that when atexit functions
479
are run, the total number of calls made on that medium are reported via
484
self.counts = weakref.WeakKeyDictionary()
485
client._SmartClient.hooks.install_named_hook(
486
'call', self.increment_call_count, 'hpss call counter')
487
atexit.register(self.flush_all)
489
def track(self, medium):
490
"""Start tracking calls made to a medium.
492
This only keeps a weakref to the medium, so shouldn't affect the
495
medium_repr = repr(medium)
496
# Add this medium to the WeakKeyDictionary
497
self.counts[medium] = [0, medium_repr]
498
# Weakref callbacks are fired in reverse order of their association
499
# with the referenced object. So we add a weakref *after* adding to
500
# the WeakKeyDict so that we can report the value from it before the
501
# entry is removed by the WeakKeyDict's own callback.
502
ref = weakref.ref(medium, self.done)
504
def increment_call_count(self, params):
505
# Increment the count in the WeakKeyDictionary
506
value = self.counts[params.medium]
510
value = self.counts[ref]
511
count, medium_repr = value
512
# In case this callback is invoked for the same ref twice (by the
513
# weakref callback and by the atexit function), set the call count back
514
# to 0 so this item won't be reported twice.
517
trace.note('HPSS calls: %d %s', count, medium_repr)
520
for ref in list(self.counts.keys()):
523
_debug_counter = None
526
class SmartClientMedium(SmartMedium):
527
"""Smart client is a medium for sending smart protocol requests over."""
529
def __init__(self, base):
530
super(SmartClientMedium, self).__init__()
532
self._protocol_version_error = None
533
self._protocol_version = None
534
self._done_hello = False
535
# Be optimistic: we assume the remote end can accept new remote
536
# requests until we get an error saying otherwise.
537
# _remote_version_is_before tracks the bzr version the remote side
538
# can be based on what we've seen so far.
539
self._remote_version_is_before = None
540
# Install debug hook function if debug flag is set.
541
if 'hpss' in debug.debug_flags:
542
global _debug_counter
543
if _debug_counter is None:
544
_debug_counter = _DebugCounter()
545
_debug_counter.track(self)
547
def _is_remote_before(self, version_tuple):
548
"""Is it possible the remote side supports RPCs for a given version?
552
needed_version = (1, 2)
553
if medium._is_remote_before(needed_version):
554
fallback_to_pre_1_2_rpc()
558
except UnknownSmartMethod:
559
medium._remember_remote_is_before(needed_version)
560
fallback_to_pre_1_2_rpc()
562
:seealso: _remember_remote_is_before
564
if self._remote_version_is_before is None:
565
# So far, the remote side seems to support everything
567
return version_tuple >= self._remote_version_is_before
569
def _remember_remote_is_before(self, version_tuple):
570
"""Tell this medium that the remote side is older the given version.
572
:seealso: _is_remote_before
574
if (self._remote_version_is_before is not None and
575
version_tuple > self._remote_version_is_before):
576
raise AssertionError(
577
"_remember_remote_is_before(%r) called, but "
578
"_remember_remote_is_before(%r) was called previously."
579
% (version_tuple, self._remote_version_is_before))
580
self._remote_version_is_before = version_tuple
582
def protocol_version(self):
583
"""Find out if 'hello' smart request works."""
584
if self._protocol_version_error is not None:
585
raise self._protocol_version_error
586
if not self._done_hello:
588
medium_request = self.get_request()
589
# Send a 'hello' request in protocol version one, for maximum
590
# backwards compatibility.
591
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
592
client_protocol.query_version()
593
self._done_hello = True
594
except errors.SmartProtocolError, e:
595
# Cache the error, just like we would cache a successful
597
self._protocol_version_error = e
601
def should_probe(self):
602
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
605
Some transports are unambiguously smart-only; there's no need to check
606
if the transport is able to carry smart requests, because that's all
607
it is for. In those cases, this method should return False.
609
But some HTTP transports can sometimes fail to carry smart requests,
610
but still be usuable for accessing remote bzrdirs via plain file
611
accesses. So for those transports, their media should return True here
612
so that RemoteBzrDirFormat can determine if it is appropriate for that
617
def disconnect(self):
618
"""If this medium maintains a persistent connection, close it.
620
The default implementation does nothing.
623
def remote_path_from_transport(self, transport):
624
"""Convert transport into a path suitable for using in a request.
626
Note that the resulting remote path doesn't encode the host name or
627
anything but path, so it is only safe to use it in requests sent over
628
the medium from the matching transport.
630
medium_base = urlutils.join(self.base, '/')
631
rel_url = urlutils.relative_url(medium_base, transport.base)
632
return urllib.unquote(rel_url)
635
class SmartClientStreamMedium(SmartClientMedium):
636
"""Stream based medium common class.
638
SmartClientStreamMediums operate on a stream. All subclasses use a common
639
SmartClientStreamMediumRequest for their requests, and should implement
640
_accept_bytes and _read_bytes to allow the request objects to send and
644
def __init__(self, base):
645
SmartClientMedium.__init__(self, base)
646
self._current_request = None
648
def accept_bytes(self, bytes):
649
self._accept_bytes(bytes)
652
"""The SmartClientStreamMedium knows how to close the stream when it is
658
"""Flush the output stream.
660
This method is used by the SmartClientStreamMediumRequest to ensure that
661
all data for a request is sent, to avoid long timeouts or deadlocks.
663
raise NotImplementedError(self._flush)
665
def get_request(self):
666
"""See SmartClientMedium.get_request().
668
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
671
return SmartClientStreamMediumRequest(self)
674
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
675
"""A client medium using simple pipes.
677
This client does not manage the pipes: it assumes they will always be open.
680
def __init__(self, readable_pipe, writeable_pipe, base):
681
SmartClientStreamMedium.__init__(self, base)
682
self._readable_pipe = readable_pipe
683
self._writeable_pipe = writeable_pipe
685
def _accept_bytes(self, bytes):
686
"""See SmartClientStreamMedium.accept_bytes."""
687
self._writeable_pipe.write(bytes)
690
"""See SmartClientStreamMedium._flush()."""
691
self._writeable_pipe.flush()
693
def _read_bytes(self, count):
694
"""See SmartClientStreamMedium._read_bytes."""
695
return self._readable_pipe.read(count)
698
class SmartSSHClientMedium(SmartClientStreamMedium):
699
"""A client medium using SSH."""
701
def __init__(self, host, port=None, username=None, password=None,
702
base=None, vendor=None, bzr_remote_path=None):
703
"""Creates a client that will connect on the first use.
705
:param vendor: An optional override for the ssh vendor to use. See
706
bzrlib.transport.ssh for details on ssh vendors.
708
SmartClientStreamMedium.__init__(self, base)
709
self._connected = False
711
self._password = password
713
self._username = username
714
self._read_from = None
715
self._ssh_connection = None
716
self._vendor = vendor
717
self._write_to = None
718
self._bzr_remote_path = bzr_remote_path
719
if self._bzr_remote_path is None:
720
symbol_versioning.warn(
721
'bzr_remote_path is required as of bzr 0.92',
722
DeprecationWarning, stacklevel=2)
723
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
725
def _accept_bytes(self, bytes):
726
"""See SmartClientStreamMedium.accept_bytes."""
727
self._ensure_connection()
728
self._write_to.write(bytes)
730
def disconnect(self):
731
"""See SmartClientMedium.disconnect()."""
732
if not self._connected:
734
self._read_from.close()
735
self._write_to.close()
736
self._ssh_connection.close()
737
self._connected = False
739
def _ensure_connection(self):
740
"""Connect this medium if not already connected."""
743
if self._vendor is None:
744
vendor = ssh._get_ssh_vendor()
746
vendor = self._vendor
747
self._ssh_connection = vendor.connect_ssh(self._username,
748
self._password, self._host, self._port,
749
command=[self._bzr_remote_path, 'serve', '--inet',
750
'--directory=/', '--allow-writes'])
751
self._read_from, self._write_to = \
752
self._ssh_connection.get_filelike_channels()
753
self._connected = True
756
"""See SmartClientStreamMedium._flush()."""
757
self._write_to.flush()
759
def _read_bytes(self, count):
760
"""See SmartClientStreamMedium.read_bytes."""
761
if not self._connected:
762
raise errors.MediumNotConnected(self)
763
bytes_to_read = min(count, _MAX_READ_SIZE)
764
return self._read_from.read(bytes_to_read)
767
# Port 4155 is the default port for bzr://, registered with IANA.
768
BZR_DEFAULT_INTERFACE = None
769
BZR_DEFAULT_PORT = 4155
772
class SmartTCPClientMedium(SmartClientStreamMedium):
773
"""A client medium using TCP."""
775
def __init__(self, host, port, base):
776
"""Creates a client that will connect on the first use."""
777
SmartClientStreamMedium.__init__(self, base)
778
self._connected = False
783
def _accept_bytes(self, bytes):
784
"""See SmartClientMedium.accept_bytes."""
785
self._ensure_connection()
786
osutils.send_all(self._socket, bytes)
788
def disconnect(self):
789
"""See SmartClientMedium.disconnect()."""
790
if not self._connected:
794
self._connected = False
796
def _ensure_connection(self):
797
"""Connect this medium if not already connected."""
800
if self._port is None:
801
port = BZR_DEFAULT_PORT
803
port = int(self._port)
805
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
806
socket.SOCK_STREAM, 0, 0)
807
except socket.gaierror, (err_num, err_msg):
808
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
809
(self._host, port, err_msg))
810
# Initialize err in case there are no addresses returned:
811
err = socket.error("no address found for %s" % self._host)
812
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
814
self._socket = socket.socket(family, socktype, proto)
815
self._socket.setsockopt(socket.IPPROTO_TCP,
816
socket.TCP_NODELAY, 1)
817
self._socket.connect(sockaddr)
818
except socket.error, err:
819
if self._socket is not None:
824
if self._socket is None:
825
# socket errors either have a (string) or (errno, string) as their
827
if type(err.args) is str:
830
err_msg = err.args[1]
831
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
832
(self._host, port, err_msg))
833
self._connected = True
836
"""See SmartClientStreamMedium._flush().
838
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
839
add a means to do a flush, but that can be done in the future.
842
def _read_bytes(self, count):
843
"""See SmartClientMedium.read_bytes."""
844
if not self._connected:
845
raise errors.MediumNotConnected(self)
846
# We ignore the desired_count because on sockets it's more efficient to
847
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
848
return self._socket.recv(_MAX_READ_SIZE)
851
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
852
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
854
def __init__(self, medium):
855
SmartClientMediumRequest.__init__(self, medium)
856
# check that we are safe concurrency wise. If some streams start
857
# allowing concurrent requests - i.e. via multiplexing - then this
858
# assert should be moved to SmartClientStreamMedium.get_request,
859
# and the setting/unsetting of _current_request likewise moved into
860
# that class : but its unneeded overhead for now. RBC 20060922
861
if self._medium._current_request is not None:
862
raise errors.TooManyConcurrentRequests(self._medium)
863
self._medium._current_request = self
865
def _accept_bytes(self, bytes):
866
"""See SmartClientMediumRequest._accept_bytes.
868
This forwards to self._medium._accept_bytes because we are operating
869
on the mediums stream.
871
self._medium._accept_bytes(bytes)
873
def _finished_reading(self):
874
"""See SmartClientMediumRequest._finished_reading.
876
This clears the _current_request on self._medium to allow a new
877
request to be created.
879
if self._medium._current_request is not self:
880
raise AssertionError()
881
self._medium._current_request = None
883
def _finished_writing(self):
884
"""See SmartClientMediumRequest._finished_writing.
886
This invokes self._medium._flush to ensure all bytes are transmitted.
888
self._medium._flush()