/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: John Arbash Meinel
  • Date: 2011-09-23 20:24:51 UTC
  • mto: This revision was merged to the branch mainline in revision 6170.
  • Revision ID: john@arbash-meinel.com-20110923202451-5hrc3s8zmmh9tnoi
Add a nicer repr for shutting down.

Also, if we disconnect the stdin/stdout handles, it looks like we get ValueError
trying to flush them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
28
29
import sys
 
30
import time
29
31
import urllib
30
32
 
 
33
import bzrlib
31
34
from bzrlib.lazy_import import lazy_import
32
35
lazy_import(globals(), """
33
 
import atexit
 
36
import select
34
37
import socket
35
38
import thread
36
39
import weakref
38
41
from bzrlib import (
39
42
    debug,
40
43
    errors,
41
 
    symbol_versioning,
42
44
    trace,
43
45
    ui,
44
46
    urlutils,
45
47
    )
46
 
from bzrlib.smart import client, protocol, request, vfs
 
48
from bzrlib.smart import client, protocol, request, signals, vfs
47
49
from bzrlib.transport import ssh
48
50
""")
49
51
from bzrlib import osutils
176
178
        ui.ui_factory.report_transport_activity(self, bytes, direction)
177
179
 
178
180
 
 
181
_bad_file_descriptor = (errno.EBADF,)
 
182
if sys.platform == 'win32':
 
183
    # Given on Windows if you pass a closed socket to select.select. Probably
 
184
    # also given if you pass a file handle to select.
 
185
    WSAENOTSOCK = 10038
 
186
    _bad_file_descriptor += (WSAENOTSOCK,)
 
187
 
 
188
 
179
189
class SmartServerStreamMedium(SmartMedium):
180
190
    """Handles smart commands coming over a stream.
181
191
 
194
204
        the stream.  See also the _push_back method.
195
205
    """
196
206
 
197
 
    def __init__(self, backing_transport, root_client_path='/'):
 
207
    _timer = time.time
 
208
 
 
209
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
210
        """Construct new server.
199
211
 
200
212
        :param backing_transport: Transport for the directory served.
203
215
        self.backing_transport = backing_transport
204
216
        self.root_client_path = root_client_path
205
217
        self.finished = False
 
218
        if timeout is None:
 
219
            raise AssertionError('You must supply a timeout.')
 
220
        self._client_timeout = timeout
 
221
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
222
        SmartMedium.__init__(self)
207
223
 
208
224
    def serve(self):
213
229
        try:
214
230
            while not self.finished:
215
231
                server_protocol = self._build_protocol()
 
232
                # TODO: This seems inelegant:
 
233
                if server_protocol is None:
 
234
                    # We could 'continue' only to notice that self.finished is
 
235
                    # True...
 
236
                    trace.mutter('Stopped while waiting for request: %s\n'
 
237
                                 % (self,))
 
238
                    break
216
239
                self._serve_one_request(server_protocol)
 
240
        except errors.ConnectionTimeout, e:
 
241
            trace.note('%s' % (e,))
 
242
            trace.log_exception_quietly()
 
243
            self._disconnect_client()
 
244
            # We reported it, no reason to make a big fuss.
 
245
            return
217
246
        except Exception, e:
218
247
            stderr.write("%s terminating on exception %s\n" % (self, e))
219
248
            raise
 
249
        self._disconnect_client()
 
250
 
 
251
    def _stop_gracefully(self):
 
252
        """When we finish this message, stop looking for more."""
 
253
        trace.mutter('Stopping %s' % (self,))
 
254
        self.finished = True
 
255
 
 
256
    def _disconnect_client(self):
 
257
        """Close the current connection. We stopped due to a timeout/etc."""
 
258
        # The default implementation is a no-op, because that is all we used to
 
259
        # do when disconnecting from a client. I suppose we never had the
 
260
        # *server* initiate a disconnect, before
 
261
 
 
262
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
263
        """Wait for more bytes to be read, but timeout if none available.
 
264
 
 
265
        This allows us to detect idle connections, and stop trying to read from
 
266
        them, without setting the socket itself to non-blocking. This also
 
267
        allows us to specify when we watch for idle timeouts.
 
268
 
 
269
        :return: Did we timeout? (True if we timed out, False if there is data
 
270
            to be read)
 
271
        """
 
272
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
220
273
 
221
274
    def _build_protocol(self):
222
275
        """Identifies the version of the incoming request, and returns an
227
280
 
228
281
        :returns: a SmartServerRequestProtocol.
229
282
        """
 
283
        self._wait_for_bytes_with_timeout(self._client_timeout)
 
284
        if self.finished:
 
285
            # We're stopping, so don't try to do any more work
 
286
            return None
230
287
        bytes = self._get_line()
231
288
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
289
        protocol = protocol_factory(
234
291
        protocol.accept_bytes(unused_bytes)
235
292
        return protocol
236
293
 
 
294
    def _wait_on_descriptor(self, fd, timeout_seconds):
 
295
        """select() on a file descriptor, waiting for nonblocking read()
 
296
 
 
297
        This will raise a ConnectionTimeout exception if we do not get a
 
298
        readable handle before timeout_seconds.
 
299
        :return: None
 
300
        """
 
301
        t_end = self._timer() + timeout_seconds
 
302
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
 
303
        rs = xs = None
 
304
        while not rs and not xs and self._timer() < t_end:
 
305
            if self.finished:
 
306
                return
 
307
            try:
 
308
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
 
309
            except (select.error, socket.error) as e:
 
310
                err = getattr(e, 'errno', None)
 
311
                if err is None and getattr(e, 'args', None) is not None:
 
312
                    # select.error doesn't have 'errno', it just has args[0]
 
313
                    err = e.args[0]
 
314
                if err in _bad_file_descriptor:
 
315
                    return # Not a socket indicates read() will fail
 
316
                elif err == errno.EINTR:
 
317
                    # Interrupted, keep looping.
 
318
                    continue
 
319
                raise
 
320
        if rs or xs:
 
321
            return
 
322
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
 
323
                                       % (timeout_seconds,))
 
324
 
237
325
    def _serve_one_request(self, protocol):
238
326
        """Read one request from input, process, send back a response.
239
327
 
260
348
 
261
349
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
262
350
 
263
 
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
351
    def __init__(self, sock, backing_transport, root_client_path='/',
 
352
                 timeout=None):
264
353
        """Constructor.
265
354
 
266
355
        :param sock: the socket the server will read from.  It will be put
267
356
            into blocking mode.
268
357
        """
269
358
        SmartServerStreamMedium.__init__(
270
 
            self, backing_transport, root_client_path=root_client_path)
 
359
            self, backing_transport, root_client_path=root_client_path,
 
360
            timeout=timeout)
271
361
        sock.setblocking(True)
272
362
        self.socket = sock
 
363
        # Get the getpeername now, as we might be closed later when we care.
 
364
        try:
 
365
            self._client_info = sock.getpeername()
 
366
        except socket.error:
 
367
            self._client_info = '<unknown>'
 
368
 
 
369
    def __str__(self):
 
370
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
 
371
 
 
372
    def __repr__(self):
 
373
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
 
374
            self._client_info)
273
375
 
274
376
    def _serve_one_request_unguarded(self, protocol):
275
377
        while protocol.next_read_size():
284
386
 
285
387
        self._push_back(protocol.unused_data)
286
388
 
 
389
    def _disconnect_client(self):
 
390
        """Close the current connection. We stopped due to a timeout/etc."""
 
391
        self.socket.close()
 
392
 
 
393
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
394
        """Wait for more bytes to be read, but timeout if none available.
 
395
 
 
396
        This allows us to detect idle connections, and stop trying to read from
 
397
        them, without setting the socket itself to non-blocking. This also
 
398
        allows us to specify when we watch for idle timeouts.
 
399
 
 
400
        :return: None, this will raise ConnectionTimeout if we time out before
 
401
            data is available.
 
402
        """
 
403
        return self._wait_on_descriptor(self.socket, timeout_seconds)
 
404
 
287
405
    def _read_bytes(self, desired_count):
288
406
        return osutils.read_bytes_from_socket(
289
407
            self.socket, self._report_activity)
306
424
 
307
425
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
308
426
 
309
 
    def __init__(self, in_file, out_file, backing_transport):
 
427
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
428
        """Construct new server.
311
429
 
312
430
        :param in_file: Python file from which requests can be read.
313
431
        :param out_file: Python file to write responses.
314
432
        :param backing_transport: Transport for the directory served.
315
433
        """
316
 
        SmartServerStreamMedium.__init__(self, backing_transport)
 
434
        SmartServerStreamMedium.__init__(self, backing_transport,
 
435
            timeout=timeout)
317
436
        if sys.platform == 'win32':
318
437
            # force binary mode for files
319
438
            import msvcrt
324
443
        self._in = in_file
325
444
        self._out = out_file
326
445
 
 
446
    def serve(self):
 
447
        """See SmartServerStreamMedium.serve"""
 
448
        # This is the regular serve, except it adds signal trapping for soft
 
449
        # shutdown.
 
450
        stop_gracefully = self._stop_gracefully
 
451
        signals.register_on_hangup(id(self), stop_gracefully)
 
452
        try:
 
453
            return super(SmartServerPipeStreamMedium, self).serve()
 
454
        finally:
 
455
            signals.unregister_on_hangup(id(self))
 
456
 
327
457
    def _serve_one_request_unguarded(self, protocol):
328
458
        while True:
329
459
            # We need to be careful not to read past the end of the current
342
472
                return
343
473
            protocol.accept_bytes(bytes)
344
474
 
 
475
    def _disconnect_client(self):
 
476
        self._in.close()
 
477
        self._out.close()
 
478
 
 
479
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
480
        """Wait for more bytes to be read, but timeout if none available.
 
481
 
 
482
        This allows us to detect idle connections, and stop trying to read from
 
483
        them, without setting the socket itself to non-blocking. This also
 
484
        allows us to specify when we watch for idle timeouts.
 
485
 
 
486
        :return: None, this will raise ConnectionTimeout if we time out before
 
487
            data is available.
 
488
        """
 
489
        if (getattr(self._in, 'fileno', None) is None
 
490
            or sys.platform == 'win32'):
 
491
            # You can't select() file descriptors on Windows.
 
492
            return
 
493
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
494
 
345
495
    def _read_bytes(self, desired_count):
346
496
        return self._in.read(desired_count)
347
497
 
491
641
        return self._medium._get_line()
492
642
 
493
643
 
 
644
class _VfsRefuser(object):
 
645
    """An object that refuses all VFS requests.
 
646
 
 
647
    """
 
648
 
 
649
    def __init__(self):
 
650
        client._SmartClient.hooks.install_named_hook(
 
651
            'call', self.check_vfs, 'vfs refuser')
 
652
 
 
653
    def check_vfs(self, params):
 
654
        try:
 
655
            request_method = request.request_handlers.get(params.method)
 
656
        except KeyError:
 
657
            # A method we don't know about doesn't count as a VFS method.
 
658
            return
 
659
        if issubclass(request_method, vfs.VfsRequest):
 
660
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
 
661
 
 
662
 
494
663
class _DebugCounter(object):
495
664
    """An object that counts the HPSS calls made to each client medium.
496
665
 
497
 
    When a medium is garbage-collected, or failing that when atexit functions
498
 
    are run, the total number of calls made on that medium are reported via
499
 
    trace.note.
 
666
    When a medium is garbage-collected, or failing that when
 
667
    bzrlib.global_state exits, the total number of calls made on that medium
 
668
    are reported via trace.note.
500
669
    """
501
670
 
502
671
    def __init__(self):
503
672
        self.counts = weakref.WeakKeyDictionary()
504
673
        client._SmartClient.hooks.install_named_hook(
505
674
            'call', self.increment_call_count, 'hpss call counter')
506
 
        atexit.register(self.flush_all)
 
675
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
676
 
508
677
    def track(self, medium):
509
678
        """Start tracking calls made to a medium.
551
720
            self.done(ref)
552
721
 
553
722
_debug_counter = None
 
723
_vfs_refuser = None
554
724
 
555
725
 
556
726
class SmartClientMedium(SmartMedium):
573
743
            if _debug_counter is None:
574
744
                _debug_counter = _DebugCounter()
575
745
            _debug_counter.track(self)
 
746
        if 'hpss_client_no_vfs' in debug.debug_flags:
 
747
            global _vfs_refuser
 
748
            if _vfs_refuser is None:
 
749
                _vfs_refuser = _VfsRefuser()
576
750
 
577
751
    def _is_remote_before(self, version_tuple):
578
752
        """Is it possible the remote side supports RPCs for a given version?
715
889
    """A client medium using simple pipes.
716
890
 
717
891
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
892
    """
723
893
 
724
894
    def __init__(self, readable_pipe, writeable_pipe, base):
737
907
 
738
908
    def _read_bytes(self, count):
739
909
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
910
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
911
        bytes = self._readable_pipe.read(bytes_to_read)
741
912
        self._report_activity(len(bytes), 'read')
742
913
        return bytes
743
914
 
744
915
 
 
916
class SSHParams(object):
 
917
    """A set of parameters for starting a remote bzr via SSH."""
 
918
 
 
919
    def __init__(self, host, port=None, username=None, password=None,
 
920
            bzr_remote_path='bzr'):
 
921
        self.host = host
 
922
        self.port = port
 
923
        self.username = username
 
924
        self.password = password
 
925
        self.bzr_remote_path = bzr_remote_path
 
926
 
 
927
 
745
928
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
 
929
    """A client medium using SSH.
 
930
    
 
931
    It delegates IO to a SmartClientSocketMedium or
 
932
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
933
    """
747
934
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
935
    def __init__(self, base, ssh_params, vendor=None):
750
936
        """Creates a client that will connect on the first use.
751
937
 
 
938
        :param ssh_params: A SSHParams instance.
752
939
        :param vendor: An optional override for the ssh vendor to use. See
753
940
            bzrlib.transport.ssh for details on ssh vendors.
754
941
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
942
        self._real_medium = None
 
943
        self._ssh_params = ssh_params
760
944
        # for the benefit of progress making a short description of this
761
945
        # transport
762
946
        self._scheme = 'bzr+ssh'
764
948
        # _DebugCounter so we have to store all the values used in our repr
765
949
        # method before calling the super init.
766
950
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
951
        self._vendor = vendor
768
952
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
953
 
773
954
    def __repr__(self):
774
 
        if self._port is None:
 
955
        if self._ssh_params.port is None:
775
956
            maybe_port = ''
776
957
        else:
777
 
            maybe_port = ':%s' % self._port
 
958
            maybe_port = ':%s' % self._ssh_params.port
778
959
        return "%s(%s://%s@%s%s/)" % (
779
960
            self.__class__.__name__,
780
961
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
962
            self._ssh_params.username,
 
963
            self._ssh_params.host,
783
964
            maybe_port)
784
965
 
785
966
    def _accept_bytes(self, bytes):
786
967
        """See SmartClientStreamMedium.accept_bytes."""
787
968
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
969
        self._real_medium.accept_bytes(bytes)
790
970
 
791
971
    def disconnect(self):
792
972
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
973
        if self._real_medium is not None:
 
974
            self._real_medium.disconnect()
 
975
            self._real_medium = None
 
976
        if self._ssh_connection is not None:
 
977
            self._ssh_connection.close()
 
978
            self._ssh_connection = None
799
979
 
800
980
    def _ensure_connection(self):
801
981
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
982
        if self._real_medium is not None:
803
983
            return
804
984
        if self._vendor is None:
805
985
            vendor = ssh._get_ssh_vendor()
806
986
        else:
807
987
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
988
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
989
                self._ssh_params.password, self._ssh_params.host,
 
990
                self._ssh_params.port,
 
991
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
992
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
993
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
994
        if io_kind == 'socket':
 
995
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
996
                self.base, io_object)
 
997
        elif io_kind == 'pipes':
 
998
            read_from, write_to = io_object
 
999
            self._real_medium = SmartSimplePipesClientMedium(
 
1000
                read_from, write_to, self.base)
 
1001
        else:
 
1002
            raise AssertionError(
 
1003
                "Unexpected io_kind %r from %r"
 
1004
                % (io_kind, self._ssh_connection))
815
1005
 
816
1006
    def _flush(self):
817
1007
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
1008
        self._real_medium._flush()
819
1009
 
820
1010
    def _read_bytes(self, count):
821
1011
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
1012
        if self._real_medium is None:
823
1013
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
1014
        return self._real_medium.read_bytes(count)
828
1015
 
829
1016
 
830
1017
# Port 4155 is the default port for bzr://, registered with IANA.
832
1019
BZR_DEFAULT_PORT = 4155
833
1020
 
834
1021
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
1022
class SmartClientSocketMedium(SmartClientStreamMedium):
 
1023
    """A client medium using a socket.
 
1024
    
 
1025
    This class isn't usable directly.  Use one of its subclasses instead.
 
1026
    """
837
1027
 
838
 
    def __init__(self, host, port, base):
839
 
        """Creates a client that will connect on the first use."""
 
1028
    def __init__(self, base):
840
1029
        SmartClientStreamMedium.__init__(self, base)
 
1030
        self._socket = None
841
1031
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
1032
 
846
1033
    def _accept_bytes(self, bytes):
847
1034
        """See SmartClientMedium.accept_bytes."""
848
1035
        self._ensure_connection()
849
1036
        osutils.send_all(self._socket, bytes, self._report_activity)
850
1037
 
 
1038
    def _ensure_connection(self):
 
1039
        """Connect this medium if not already connected."""
 
1040
        raise NotImplementedError(self._ensure_connection)
 
1041
 
 
1042
    def _flush(self):
 
1043
        """See SmartClientStreamMedium._flush().
 
1044
 
 
1045
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
1046
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
1047
        future.
 
1048
        """
 
1049
 
 
1050
    def _read_bytes(self, count):
 
1051
        """See SmartClientMedium.read_bytes."""
 
1052
        if not self._connected:
 
1053
            raise errors.MediumNotConnected(self)
 
1054
        return osutils.read_bytes_from_socket(
 
1055
            self._socket, self._report_activity)
 
1056
 
851
1057
    def disconnect(self):
852
1058
        """See SmartClientMedium.disconnect()."""
853
1059
        if not self._connected:
856
1062
        self._socket = None
857
1063
        self._connected = False
858
1064
 
 
1065
 
 
1066
class SmartTCPClientMedium(SmartClientSocketMedium):
 
1067
    """A client medium that creates a TCP connection."""
 
1068
 
 
1069
    def __init__(self, host, port, base):
 
1070
        """Creates a client that will connect on the first use."""
 
1071
        SmartClientSocketMedium.__init__(self, base)
 
1072
        self._host = host
 
1073
        self._port = port
 
1074
 
859
1075
    def _ensure_connection(self):
860
1076
        """Connect this medium if not already connected."""
861
1077
        if self._connected:
895
1111
                    (self._host, port, err_msg))
896
1112
        self._connected = True
897
1113
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
1114
 
 
1115
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
1116
    """A client medium for an already connected socket.
 
1117
    
 
1118
    Note that this class will assume it "owns" the socket, so it will close it
 
1119
    when its disconnect method is called.
 
1120
    """
 
1121
 
 
1122
    def __init__(self, base, sock):
 
1123
        SmartClientSocketMedium.__init__(self, base)
 
1124
        self._socket = sock
 
1125
        self._connected = True
 
1126
 
 
1127
    def _ensure_connection(self):
 
1128
        # Already connected, by definition!  So nothing to do.
 
1129
        pass
911
1130
 
912
1131
 
913
1132
class SmartClientStreamMediumRequest(SmartClientMediumRequest):