/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2008-01-04 03:12:11 UTC
  • mfrom: (3164 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3320.
  • Revision ID: andrew.bennetts@canonical.com-20080104031211-wy4uxo2j4elvip1j
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""The 'medium' layer for the smart servers and clients.
 
18
 
 
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
 
20
for "the quality between big and small."
 
21
 
 
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
 
23
over SSH), and pass them to and from the protocol logic.  See the overview in
 
24
bzrlib/transport/smart/__init__.py.
 
25
"""
 
26
 
 
27
import os
 
28
import socket
 
29
import sys
 
30
 
 
31
from bzrlib import (
 
32
    errors,
 
33
    osutils,
 
34
    symbol_versioning,
 
35
    )
 
36
from bzrlib.smart.protocol import (
 
37
    REQUEST_VERSION_TWO,
 
38
    SmartServerRequestProtocolOne,
 
39
    SmartServerRequestProtocolTwo,
 
40
    )
 
41
from bzrlib.transport import ssh
 
42
 
 
43
 
 
44
class SmartServerStreamMedium(object):
 
45
    """Handles smart commands coming over a stream.
 
46
 
 
47
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
48
    in-process fifo for testing.
 
49
 
 
50
    One instance is created for each connected client; it can serve multiple
 
51
    requests in the lifetime of the connection.
 
52
 
 
53
    The server passes requests through to an underlying backing transport, 
 
54
    which will typically be a LocalTransport looking at the server's filesystem.
 
55
    """
 
56
 
 
57
    def __init__(self, backing_transport, root_client_path='/'):
 
58
        """Construct new server.
 
59
 
 
60
        :param backing_transport: Transport for the directory served.
 
61
        """
 
62
        # backing_transport could be passed to serve instead of __init__
 
63
        self.backing_transport = backing_transport
 
64
        self.root_client_path = root_client_path
 
65
        self.finished = False
 
66
 
 
67
    def serve(self):
 
68
        """Serve requests until the client disconnects."""
 
69
        # Keep a reference to stderr because the sys module's globals get set to
 
70
        # None during interpreter shutdown.
 
71
        from sys import stderr
 
72
        try:
 
73
            while not self.finished:
 
74
                server_protocol = self._build_protocol()
 
75
                self._serve_one_request(server_protocol)
 
76
        except Exception, e:
 
77
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
78
            raise
 
79
 
 
80
    def _build_protocol(self):
 
81
        """Identifies the version of the incoming request, and returns an
 
82
        a protocol object that can interpret it.
 
83
 
 
84
        If more bytes than the version prefix of the request are read, they will
 
85
        be fed into the protocol before it is returned.
 
86
 
 
87
        :returns: a SmartServerRequestProtocol.
 
88
        """
 
89
        # Identify the protocol version.
 
90
        bytes = self._get_line()
 
91
        if bytes.startswith(REQUEST_VERSION_TWO):
 
92
            protocol_class = SmartServerRequestProtocolTwo
 
93
            bytes = bytes[len(REQUEST_VERSION_TWO):]
 
94
        else:
 
95
            protocol_class = SmartServerRequestProtocolOne
 
96
        protocol = protocol_class(
 
97
            self.backing_transport, self._write_out, self.root_client_path)
 
98
        protocol.accept_bytes(bytes)
 
99
        return protocol
 
100
 
 
101
    def _serve_one_request(self, protocol):
 
102
        """Read one request from input, process, send back a response.
 
103
        
 
104
        :param protocol: a SmartServerRequestProtocol.
 
105
        """
 
106
        try:
 
107
            self._serve_one_request_unguarded(protocol)
 
108
        except KeyboardInterrupt:
 
109
            raise
 
110
        except Exception, e:
 
111
            self.terminate_due_to_error()
 
112
 
 
113
    def terminate_due_to_error(self):
 
114
        """Called when an unhandled exception from the protocol occurs."""
 
115
        raise NotImplementedError(self.terminate_due_to_error)
 
116
 
 
117
    def _get_bytes(self, desired_count):
 
118
        """Get some bytes from the medium.
 
119
 
 
120
        :param desired_count: number of bytes we want to read.
 
121
        """
 
122
        raise NotImplementedError(self._get_bytes)
 
123
 
 
124
    def _get_line(self):
 
125
        """Read bytes from this request's response until a newline byte.
 
126
        
 
127
        This isn't particularly efficient, so should only be used when the
 
128
        expected size of the line is quite short.
 
129
 
 
130
        :returns: a string of bytes ending in a newline (byte 0x0A).
 
131
        """
 
132
        # XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
 
133
        line = ''
 
134
        while not line or line[-1] != '\n':
 
135
            new_char = self._get_bytes(1)
 
136
            line += new_char
 
137
            if new_char == '':
 
138
                # Ran out of bytes before receiving a complete line.
 
139
                break
 
140
        return line
 
141
 
 
142
 
 
143
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
144
 
 
145
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
146
        """Constructor.
 
147
 
 
148
        :param sock: the socket the server will read from.  It will be put
 
149
            into blocking mode.
 
150
        """
 
151
        SmartServerStreamMedium.__init__(
 
152
            self, backing_transport, root_client_path=root_client_path)
 
153
        self.push_back = ''
 
154
        sock.setblocking(True)
 
155
        self.socket = sock
 
156
 
 
157
    def _serve_one_request_unguarded(self, protocol):
 
158
        while protocol.next_read_size():
 
159
            if self.push_back:
 
160
                protocol.accept_bytes(self.push_back)
 
161
                self.push_back = ''
 
162
            else:
 
163
                bytes = self._get_bytes(4096)
 
164
                if bytes == '':
 
165
                    self.finished = True
 
166
                    return
 
167
                protocol.accept_bytes(bytes)
 
168
        
 
169
        self.push_back = protocol.excess_buffer
 
170
 
 
171
    def _get_bytes(self, desired_count):
 
172
        # We ignore the desired_count because on sockets it's more efficient to
 
173
        # read 4k at a time.
 
174
        return self.socket.recv(4096)
 
175
    
 
176
    def terminate_due_to_error(self):
 
177
        """Called when an unhandled exception from the protocol occurs."""
 
178
        # TODO: This should log to a server log file, but no such thing
 
179
        # exists yet.  Andrew Bennetts 2006-09-29.
 
180
        self.socket.close()
 
181
        self.finished = True
 
182
 
 
183
    def _write_out(self, bytes):
 
184
        osutils.send_all(self.socket, bytes)
 
185
 
 
186
 
 
187
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
188
 
 
189
    def __init__(self, in_file, out_file, backing_transport):
 
190
        """Construct new server.
 
191
 
 
192
        :param in_file: Python file from which requests can be read.
 
193
        :param out_file: Python file to write responses.
 
194
        :param backing_transport: Transport for the directory served.
 
195
        """
 
196
        SmartServerStreamMedium.__init__(self, backing_transport)
 
197
        if sys.platform == 'win32':
 
198
            # force binary mode for files
 
199
            import msvcrt
 
200
            for f in (in_file, out_file):
 
201
                fileno = getattr(f, 'fileno', None)
 
202
                if fileno:
 
203
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
204
        self._in = in_file
 
205
        self._out = out_file
 
206
 
 
207
    def _serve_one_request_unguarded(self, protocol):
 
208
        while True:
 
209
            bytes_to_read = protocol.next_read_size()
 
210
            if bytes_to_read == 0:
 
211
                # Finished serving this request.
 
212
                self._out.flush()
 
213
                return
 
214
            bytes = self._get_bytes(bytes_to_read)
 
215
            if bytes == '':
 
216
                # Connection has been closed.
 
217
                self.finished = True
 
218
                self._out.flush()
 
219
                return
 
220
            protocol.accept_bytes(bytes)
 
221
 
 
222
    def _get_bytes(self, desired_count):
 
223
        return self._in.read(desired_count)
 
224
 
 
225
    def terminate_due_to_error(self):
 
226
        # TODO: This should log to a server log file, but no such thing
 
227
        # exists yet.  Andrew Bennetts 2006-09-29.
 
228
        self._out.close()
 
229
        self.finished = True
 
230
 
 
231
    def _write_out(self, bytes):
 
232
        self._out.write(bytes)
 
233
 
 
234
 
 
235
class SmartClientMediumRequest(object):
 
236
    """A request on a SmartClientMedium.
 
237
 
 
238
    Each request allows bytes to be provided to it via accept_bytes, and then
 
239
    the response bytes to be read via read_bytes.
 
240
 
 
241
    For instance:
 
242
    request.accept_bytes('123')
 
243
    request.finished_writing()
 
244
    result = request.read_bytes(3)
 
245
    request.finished_reading()
 
246
 
 
247
    It is up to the individual SmartClientMedium whether multiple concurrent
 
248
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
249
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
250
    details on concurrency and pipelining.
 
251
    """
 
252
 
 
253
    def __init__(self, medium):
 
254
        """Construct a SmartClientMediumRequest for the medium medium."""
 
255
        self._medium = medium
 
256
        # we track state by constants - we may want to use the same
 
257
        # pattern as BodyReader if it gets more complex.
 
258
        # valid states are: "writing", "reading", "done"
 
259
        self._state = "writing"
 
260
 
 
261
    def accept_bytes(self, bytes):
 
262
        """Accept bytes for inclusion in this request.
 
263
 
 
264
        This method may not be be called after finished_writing() has been
 
265
        called.  It depends upon the Medium whether or not the bytes will be
 
266
        immediately transmitted. Message based Mediums will tend to buffer the
 
267
        bytes until finished_writing() is called.
 
268
 
 
269
        :param bytes: A bytestring.
 
270
        """
 
271
        if self._state != "writing":
 
272
            raise errors.WritingCompleted(self)
 
273
        self._accept_bytes(bytes)
 
274
 
 
275
    def _accept_bytes(self, bytes):
 
276
        """Helper for accept_bytes.
 
277
 
 
278
        Accept_bytes checks the state of the request to determing if bytes
 
279
        should be accepted. After that it hands off to _accept_bytes to do the
 
280
        actual acceptance.
 
281
        """
 
282
        raise NotImplementedError(self._accept_bytes)
 
283
 
 
284
    def finished_reading(self):
 
285
        """Inform the request that all desired data has been read.
 
286
 
 
287
        This will remove the request from the pipeline for its medium (if the
 
288
        medium supports pipelining) and any further calls to methods on the
 
289
        request will raise ReadingCompleted.
 
290
        """
 
291
        if self._state == "writing":
 
292
            raise errors.WritingNotComplete(self)
 
293
        if self._state != "reading":
 
294
            raise errors.ReadingCompleted(self)
 
295
        self._state = "done"
 
296
        self._finished_reading()
 
297
 
 
298
    def _finished_reading(self):
 
299
        """Helper for finished_reading.
 
300
 
 
301
        finished_reading checks the state of the request to determine if 
 
302
        finished_reading is allowed, and if it is hands off to _finished_reading
 
303
        to perform the action.
 
304
        """
 
305
        raise NotImplementedError(self._finished_reading)
 
306
 
 
307
    def finished_writing(self):
 
308
        """Finish the writing phase of this request.
 
309
 
 
310
        This will flush all pending data for this request along the medium.
 
311
        After calling finished_writing, you may not call accept_bytes anymore.
 
312
        """
 
313
        if self._state != "writing":
 
314
            raise errors.WritingCompleted(self)
 
315
        self._state = "reading"
 
316
        self._finished_writing()
 
317
 
 
318
    def _finished_writing(self):
 
319
        """Helper for finished_writing.
 
320
 
 
321
        finished_writing checks the state of the request to determine if 
 
322
        finished_writing is allowed, and if it is hands off to _finished_writing
 
323
        to perform the action.
 
324
        """
 
325
        raise NotImplementedError(self._finished_writing)
 
326
 
 
327
    def read_bytes(self, count):
 
328
        """Read bytes from this requests response.
 
329
 
 
330
        This method will block and wait for count bytes to be read. It may not
 
331
        be invoked until finished_writing() has been called - this is to ensure
 
332
        a message-based approach to requests, for compatibility with message
 
333
        based mediums like HTTP.
 
334
        """
 
335
        if self._state == "writing":
 
336
            raise errors.WritingNotComplete(self)
 
337
        if self._state != "reading":
 
338
            raise errors.ReadingCompleted(self)
 
339
        return self._read_bytes(count)
 
340
 
 
341
    def _read_bytes(self, count):
 
342
        """Helper for read_bytes.
 
343
 
 
344
        read_bytes checks the state of the request to determing if bytes
 
345
        should be read. After that it hands off to _read_bytes to do the
 
346
        actual read.
 
347
        """
 
348
        raise NotImplementedError(self._read_bytes)
 
349
 
 
350
    def read_line(self):
 
351
        """Read bytes from this request's response until a newline byte.
 
352
        
 
353
        This isn't particularly efficient, so should only be used when the
 
354
        expected size of the line is quite short.
 
355
 
 
356
        :returns: a string of bytes ending in a newline (byte 0x0A).
 
357
        """
 
358
        # XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
 
359
        line = ''
 
360
        while not line or line[-1] != '\n':
 
361
            new_char = self.read_bytes(1)
 
362
            line += new_char
 
363
            if new_char == '':
 
364
                raise errors.SmartProtocolError(
 
365
                    'unexpected end of file reading from server')
 
366
        return line
 
367
 
 
368
 
 
369
class SmartClientMedium(object):
 
370
    """Smart client is a medium for sending smart protocol requests over."""
 
371
 
 
372
    def disconnect(self):
 
373
        """If this medium maintains a persistent connection, close it.
 
374
        
 
375
        The default implementation does nothing.
 
376
        """
 
377
        
 
378
 
 
379
class SmartClientStreamMedium(SmartClientMedium):
 
380
    """Stream based medium common class.
 
381
 
 
382
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
383
    SmartClientStreamMediumRequest for their requests, and should implement
 
384
    _accept_bytes and _read_bytes to allow the request objects to send and
 
385
    receive bytes.
 
386
    """
 
387
 
 
388
    def __init__(self):
 
389
        self._current_request = None
 
390
 
 
391
    def accept_bytes(self, bytes):
 
392
        self._accept_bytes(bytes)
 
393
 
 
394
    def __del__(self):
 
395
        """The SmartClientStreamMedium knows how to close the stream when it is
 
396
        finished with it.
 
397
        """
 
398
        self.disconnect()
 
399
 
 
400
    def _flush(self):
 
401
        """Flush the output stream.
 
402
        
 
403
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
404
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
405
        """
 
406
        raise NotImplementedError(self._flush)
 
407
 
 
408
    def get_request(self):
 
409
        """See SmartClientMedium.get_request().
 
410
 
 
411
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
412
        for get_request.
 
413
        """
 
414
        return SmartClientStreamMediumRequest(self)
 
415
 
 
416
    def read_bytes(self, count):
 
417
        return self._read_bytes(count)
 
418
 
 
419
 
 
420
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
421
    """A client medium using simple pipes.
 
422
    
 
423
    This client does not manage the pipes: it assumes they will always be open.
 
424
    """
 
425
 
 
426
    def __init__(self, readable_pipe, writeable_pipe):
 
427
        SmartClientStreamMedium.__init__(self)
 
428
        self._readable_pipe = readable_pipe
 
429
        self._writeable_pipe = writeable_pipe
 
430
 
 
431
    def _accept_bytes(self, bytes):
 
432
        """See SmartClientStreamMedium.accept_bytes."""
 
433
        self._writeable_pipe.write(bytes)
 
434
 
 
435
    def _flush(self):
 
436
        """See SmartClientStreamMedium._flush()."""
 
437
        self._writeable_pipe.flush()
 
438
 
 
439
    def _read_bytes(self, count):
 
440
        """See SmartClientStreamMedium._read_bytes."""
 
441
        return self._readable_pipe.read(count)
 
442
 
 
443
 
 
444
class SmartSSHClientMedium(SmartClientStreamMedium):
 
445
    """A client medium using SSH."""
 
446
    
 
447
    def __init__(self, host, port=None, username=None, password=None,
 
448
            vendor=None, bzr_remote_path=None):
 
449
        """Creates a client that will connect on the first use.
 
450
        
 
451
        :param vendor: An optional override for the ssh vendor to use. See
 
452
            bzrlib.transport.ssh for details on ssh vendors.
 
453
        """
 
454
        SmartClientStreamMedium.__init__(self)
 
455
        self._connected = False
 
456
        self._host = host
 
457
        self._password = password
 
458
        self._port = port
 
459
        self._username = username
 
460
        self._read_from = None
 
461
        self._ssh_connection = None
 
462
        self._vendor = vendor
 
463
        self._write_to = None
 
464
        self._bzr_remote_path = bzr_remote_path
 
465
        if self._bzr_remote_path is None:
 
466
            symbol_versioning.warn(
 
467
                'bzr_remote_path is required as of bzr 0.92',
 
468
                DeprecationWarning, stacklevel=2)
 
469
            self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
470
 
 
471
    def _accept_bytes(self, bytes):
 
472
        """See SmartClientStreamMedium.accept_bytes."""
 
473
        self._ensure_connection()
 
474
        self._write_to.write(bytes)
 
475
 
 
476
    def disconnect(self):
 
477
        """See SmartClientMedium.disconnect()."""
 
478
        if not self._connected:
 
479
            return
 
480
        self._read_from.close()
 
481
        self._write_to.close()
 
482
        self._ssh_connection.close()
 
483
        self._connected = False
 
484
 
 
485
    def _ensure_connection(self):
 
486
        """Connect this medium if not already connected."""
 
487
        if self._connected:
 
488
            return
 
489
        if self._vendor is None:
 
490
            vendor = ssh._get_ssh_vendor()
 
491
        else:
 
492
            vendor = self._vendor
 
493
        self._ssh_connection = vendor.connect_ssh(self._username,
 
494
                self._password, self._host, self._port,
 
495
                command=[self._bzr_remote_path, 'serve', '--inet',
 
496
                         '--directory=/', '--allow-writes'])
 
497
        self._read_from, self._write_to = \
 
498
            self._ssh_connection.get_filelike_channels()
 
499
        self._connected = True
 
500
 
 
501
    def _flush(self):
 
502
        """See SmartClientStreamMedium._flush()."""
 
503
        self._write_to.flush()
 
504
 
 
505
    def _read_bytes(self, count):
 
506
        """See SmartClientStreamMedium.read_bytes."""
 
507
        if not self._connected:
 
508
            raise errors.MediumNotConnected(self)
 
509
        return self._read_from.read(count)
 
510
 
 
511
 
 
512
# Port 4155 is the default port for bzr://, registered with IANA.
 
513
BZR_DEFAULT_INTERFACE = '0.0.0.0'
 
514
BZR_DEFAULT_PORT = 4155
 
515
 
 
516
 
 
517
class SmartTCPClientMedium(SmartClientStreamMedium):
 
518
    """A client medium using TCP."""
 
519
    
 
520
    def __init__(self, host, port):
 
521
        """Creates a client that will connect on the first use."""
 
522
        SmartClientStreamMedium.__init__(self)
 
523
        self._connected = False
 
524
        self._host = host
 
525
        self._port = port
 
526
        self._socket = None
 
527
 
 
528
    def _accept_bytes(self, bytes):
 
529
        """See SmartClientMedium.accept_bytes."""
 
530
        self._ensure_connection()
 
531
        osutils.send_all(self._socket, bytes)
 
532
 
 
533
    def disconnect(self):
 
534
        """See SmartClientMedium.disconnect()."""
 
535
        if not self._connected:
 
536
            return
 
537
        self._socket.close()
 
538
        self._socket = None
 
539
        self._connected = False
 
540
 
 
541
    def _ensure_connection(self):
 
542
        """Connect this medium if not already connected."""
 
543
        if self._connected:
 
544
            return
 
545
        self._socket = socket.socket()
 
546
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
547
        if self._port is None:
 
548
            port = BZR_DEFAULT_PORT
 
549
        else:
 
550
            port = int(self._port)
 
551
        result = self._socket.connect_ex((self._host, port))
 
552
        if result:
 
553
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
554
                    (self._host, port, os.strerror(result)))
 
555
        self._connected = True
 
556
 
 
557
    def _flush(self):
 
558
        """See SmartClientStreamMedium._flush().
 
559
        
 
560
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
561
        add a means to do a flush, but that can be done in the future.
 
562
        """
 
563
 
 
564
    def _read_bytes(self, count):
 
565
        """See SmartClientMedium.read_bytes."""
 
566
        if not self._connected:
 
567
            raise errors.MediumNotConnected(self)
 
568
        return self._socket.recv(count)
 
569
 
 
570
 
 
571
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
572
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
573
 
 
574
    def __init__(self, medium):
 
575
        SmartClientMediumRequest.__init__(self, medium)
 
576
        # check that we are safe concurrency wise. If some streams start
 
577
        # allowing concurrent requests - i.e. via multiplexing - then this
 
578
        # assert should be moved to SmartClientStreamMedium.get_request,
 
579
        # and the setting/unsetting of _current_request likewise moved into
 
580
        # that class : but its unneeded overhead for now. RBC 20060922
 
581
        if self._medium._current_request is not None:
 
582
            raise errors.TooManyConcurrentRequests(self._medium)
 
583
        self._medium._current_request = self
 
584
 
 
585
    def _accept_bytes(self, bytes):
 
586
        """See SmartClientMediumRequest._accept_bytes.
 
587
        
 
588
        This forwards to self._medium._accept_bytes because we are operating
 
589
        on the mediums stream.
 
590
        """
 
591
        self._medium._accept_bytes(bytes)
 
592
 
 
593
    def _finished_reading(self):
 
594
        """See SmartClientMediumRequest._finished_reading.
 
595
 
 
596
        This clears the _current_request on self._medium to allow a new 
 
597
        request to be created.
 
598
        """
 
599
        assert self._medium._current_request is self
 
600
        self._medium._current_request = None
 
601
        
 
602
    def _finished_writing(self):
 
603
        """See SmartClientMediumRequest._finished_writing.
 
604
 
 
605
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
606
        """
 
607
        self._medium._flush()
 
608
 
 
609
    def _read_bytes(self, count):
 
610
        """See SmartClientMediumRequest._read_bytes.
 
611
        
 
612
        This forwards to self._medium._read_bytes because we are operating
 
613
        on the mediums stream.
 
614
        """
 
615
        return self._medium._read_bytes(count)
 
616