/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-18 15:43:47 UTC
  • mto: This revision was merged to the branch mainline in revision 6383.
  • Revision ID: jelmer@samba.org-20111218154347-d42sxp2qzn36uo2r
Add urlutils.quote / urlutils.unquote.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
28
29
import sys
29
 
import urllib
 
30
import time
30
31
 
 
32
import bzrlib
31
33
from bzrlib.lazy_import import lazy_import
32
34
lazy_import(globals(), """
33
 
import atexit
 
35
import select
34
36
import socket
35
37
import thread
36
38
import weakref
38
40
from bzrlib import (
39
41
    debug,
40
42
    errors,
41
 
    symbol_versioning,
42
43
    trace,
43
44
    ui,
44
45
    urlutils,
45
46
    )
46
 
from bzrlib.smart import client, protocol, request, vfs
 
47
from bzrlib.i18n import gettext
 
48
from bzrlib.smart import client, protocol, request, signals, vfs
47
49
from bzrlib.transport import ssh
48
50
""")
49
51
from bzrlib import osutils
176
178
        ui.ui_factory.report_transport_activity(self, bytes, direction)
177
179
 
178
180
 
 
181
_bad_file_descriptor = (errno.EBADF,)
 
182
if sys.platform == 'win32':
 
183
    # Given on Windows if you pass a closed socket to select.select. Probably
 
184
    # also given if you pass a file handle to select.
 
185
    WSAENOTSOCK = 10038
 
186
    _bad_file_descriptor += (WSAENOTSOCK,)
 
187
 
 
188
 
179
189
class SmartServerStreamMedium(SmartMedium):
180
190
    """Handles smart commands coming over a stream.
181
191
 
194
204
        the stream.  See also the _push_back method.
195
205
    """
196
206
 
197
 
    def __init__(self, backing_transport, root_client_path='/'):
 
207
    _timer = time.time
 
208
 
 
209
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
210
        """Construct new server.
199
211
 
200
212
        :param backing_transport: Transport for the directory served.
203
215
        self.backing_transport = backing_transport
204
216
        self.root_client_path = root_client_path
205
217
        self.finished = False
 
218
        if timeout is None:
 
219
            raise AssertionError('You must supply a timeout.')
 
220
        self._client_timeout = timeout
 
221
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
222
        SmartMedium.__init__(self)
207
223
 
208
224
    def serve(self):
214
230
            while not self.finished:
215
231
                server_protocol = self._build_protocol()
216
232
                self._serve_one_request(server_protocol)
 
233
        except errors.ConnectionTimeout, e:
 
234
            trace.note('%s' % (e,))
 
235
            trace.log_exception_quietly()
 
236
            self._disconnect_client()
 
237
            # We reported it, no reason to make a big fuss.
 
238
            return
217
239
        except Exception, e:
218
240
            stderr.write("%s terminating on exception %s\n" % (self, e))
219
241
            raise
 
242
        self._disconnect_client()
 
243
 
 
244
    def _stop_gracefully(self):
 
245
        """When we finish this message, stop looking for more."""
 
246
        trace.mutter('Stopping %s' % (self,))
 
247
        self.finished = True
 
248
 
 
249
    def _disconnect_client(self):
 
250
        """Close the current connection. We stopped due to a timeout/etc."""
 
251
        # The default implementation is a no-op, because that is all we used to
 
252
        # do when disconnecting from a client. I suppose we never had the
 
253
        # *server* initiate a disconnect, before
 
254
 
 
255
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
256
        """Wait for more bytes to be read, but timeout if none available.
 
257
 
 
258
        This allows us to detect idle connections, and stop trying to read from
 
259
        them, without setting the socket itself to non-blocking. This also
 
260
        allows us to specify when we watch for idle timeouts.
 
261
 
 
262
        :return: Did we timeout? (True if we timed out, False if there is data
 
263
            to be read)
 
264
        """
 
265
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
220
266
 
221
267
    def _build_protocol(self):
222
268
        """Identifies the version of the incoming request, and returns an
227
273
 
228
274
        :returns: a SmartServerRequestProtocol.
229
275
        """
 
276
        self._wait_for_bytes_with_timeout(self._client_timeout)
 
277
        if self.finished:
 
278
            # We're stopping, so don't try to do any more work
 
279
            return None
230
280
        bytes = self._get_line()
231
281
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
282
        protocol = protocol_factory(
234
284
        protocol.accept_bytes(unused_bytes)
235
285
        return protocol
236
286
 
 
287
    def _wait_on_descriptor(self, fd, timeout_seconds):
 
288
        """select() on a file descriptor, waiting for nonblocking read()
 
289
 
 
290
        This will raise a ConnectionTimeout exception if we do not get a
 
291
        readable handle before timeout_seconds.
 
292
        :return: None
 
293
        """
 
294
        t_end = self._timer() + timeout_seconds
 
295
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
 
296
        rs = xs = None
 
297
        while not rs and not xs and self._timer() < t_end:
 
298
            if self.finished:
 
299
                return
 
300
            try:
 
301
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
 
302
            except (select.error, socket.error) as e:
 
303
                err = getattr(e, 'errno', None)
 
304
                if err is None and getattr(e, 'args', None) is not None:
 
305
                    # select.error doesn't have 'errno', it just has args[0]
 
306
                    err = e.args[0]
 
307
                if err in _bad_file_descriptor:
 
308
                    return # Not a socket indicates read() will fail
 
309
                elif err == errno.EINTR:
 
310
                    # Interrupted, keep looping.
 
311
                    continue
 
312
                raise
 
313
        if rs or xs:
 
314
            return
 
315
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
 
316
                                       % (timeout_seconds,))
 
317
 
237
318
    def _serve_one_request(self, protocol):
238
319
        """Read one request from input, process, send back a response.
239
320
 
240
321
        :param protocol: a SmartServerRequestProtocol.
241
322
        """
 
323
        if protocol is None:
 
324
            return
242
325
        try:
243
326
            self._serve_one_request_unguarded(protocol)
244
327
        except KeyboardInterrupt:
260
343
 
261
344
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
262
345
 
263
 
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
346
    def __init__(self, sock, backing_transport, root_client_path='/',
 
347
                 timeout=None):
264
348
        """Constructor.
265
349
 
266
350
        :param sock: the socket the server will read from.  It will be put
267
351
            into blocking mode.
268
352
        """
269
353
        SmartServerStreamMedium.__init__(
270
 
            self, backing_transport, root_client_path=root_client_path)
 
354
            self, backing_transport, root_client_path=root_client_path,
 
355
            timeout=timeout)
271
356
        sock.setblocking(True)
272
357
        self.socket = sock
 
358
        # Get the getpeername now, as we might be closed later when we care.
 
359
        try:
 
360
            self._client_info = sock.getpeername()
 
361
        except socket.error:
 
362
            self._client_info = '<unknown>'
 
363
 
 
364
    def __str__(self):
 
365
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
 
366
 
 
367
    def __repr__(self):
 
368
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
 
369
            self._client_info)
273
370
 
274
371
    def _serve_one_request_unguarded(self, protocol):
275
372
        while protocol.next_read_size():
284
381
 
285
382
        self._push_back(protocol.unused_data)
286
383
 
 
384
    def _disconnect_client(self):
 
385
        """Close the current connection. We stopped due to a timeout/etc."""
 
386
        self.socket.close()
 
387
 
 
388
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
389
        """Wait for more bytes to be read, but timeout if none available.
 
390
 
 
391
        This allows us to detect idle connections, and stop trying to read from
 
392
        them, without setting the socket itself to non-blocking. This also
 
393
        allows us to specify when we watch for idle timeouts.
 
394
 
 
395
        :return: None, this will raise ConnectionTimeout if we time out before
 
396
            data is available.
 
397
        """
 
398
        return self._wait_on_descriptor(self.socket, timeout_seconds)
 
399
 
287
400
    def _read_bytes(self, desired_count):
288
401
        return osutils.read_bytes_from_socket(
289
402
            self.socket, self._report_activity)
306
419
 
307
420
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
308
421
 
309
 
    def __init__(self, in_file, out_file, backing_transport):
 
422
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
423
        """Construct new server.
311
424
 
312
425
        :param in_file: Python file from which requests can be read.
313
426
        :param out_file: Python file to write responses.
314
427
        :param backing_transport: Transport for the directory served.
315
428
        """
316
 
        SmartServerStreamMedium.__init__(self, backing_transport)
 
429
        SmartServerStreamMedium.__init__(self, backing_transport,
 
430
            timeout=timeout)
317
431
        if sys.platform == 'win32':
318
432
            # force binary mode for files
319
433
            import msvcrt
324
438
        self._in = in_file
325
439
        self._out = out_file
326
440
 
 
441
    def serve(self):
 
442
        """See SmartServerStreamMedium.serve"""
 
443
        # This is the regular serve, except it adds signal trapping for soft
 
444
        # shutdown.
 
445
        stop_gracefully = self._stop_gracefully
 
446
        signals.register_on_hangup(id(self), stop_gracefully)
 
447
        try:
 
448
            return super(SmartServerPipeStreamMedium, self).serve()
 
449
        finally:
 
450
            signals.unregister_on_hangup(id(self))
 
451
 
327
452
    def _serve_one_request_unguarded(self, protocol):
328
453
        while True:
329
454
            # We need to be careful not to read past the end of the current
342
467
                return
343
468
            protocol.accept_bytes(bytes)
344
469
 
 
470
    def _disconnect_client(self):
 
471
        self._in.close()
 
472
        self._out.flush()
 
473
        self._out.close()
 
474
 
 
475
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
476
        """Wait for more bytes to be read, but timeout if none available.
 
477
 
 
478
        This allows us to detect idle connections, and stop trying to read from
 
479
        them, without setting the socket itself to non-blocking. This also
 
480
        allows us to specify when we watch for idle timeouts.
 
481
 
 
482
        :return: None, this will raise ConnectionTimeout if we time out before
 
483
            data is available.
 
484
        """
 
485
        if (getattr(self._in, 'fileno', None) is None
 
486
            or sys.platform == 'win32'):
 
487
            # You can't select() file descriptors on Windows.
 
488
            return
 
489
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
490
 
345
491
    def _read_bytes(self, desired_count):
346
492
        return self._in.read(desired_count)
347
493
 
491
637
        return self._medium._get_line()
492
638
 
493
639
 
 
640
class _VfsRefuser(object):
 
641
    """An object that refuses all VFS requests.
 
642
 
 
643
    """
 
644
 
 
645
    def __init__(self):
 
646
        client._SmartClient.hooks.install_named_hook(
 
647
            'call', self.check_vfs, 'vfs refuser')
 
648
 
 
649
    def check_vfs(self, params):
 
650
        try:
 
651
            request_method = request.request_handlers.get(params.method)
 
652
        except KeyError:
 
653
            # A method we don't know about doesn't count as a VFS method.
 
654
            return
 
655
        if issubclass(request_method, vfs.VfsRequest):
 
656
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
 
657
 
 
658
 
494
659
class _DebugCounter(object):
495
660
    """An object that counts the HPSS calls made to each client medium.
496
661
 
497
 
    When a medium is garbage-collected, or failing that when atexit functions
498
 
    are run, the total number of calls made on that medium are reported via
499
 
    trace.note.
 
662
    When a medium is garbage-collected, or failing that when
 
663
    bzrlib.global_state exits, the total number of calls made on that medium
 
664
    are reported via trace.note.
500
665
    """
501
666
 
502
667
    def __init__(self):
503
668
        self.counts = weakref.WeakKeyDictionary()
504
669
        client._SmartClient.hooks.install_named_hook(
505
670
            'call', self.increment_call_count, 'hpss call counter')
506
 
        atexit.register(self.flush_all)
 
671
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
672
 
508
673
    def track(self, medium):
509
674
        """Start tracking calls made to a medium.
543
708
        value['count'] = 0
544
709
        value['vfs_count'] = 0
545
710
        if count != 0:
546
 
            trace.note('HPSS calls: %d (%d vfs) %s',
547
 
                       count, vfs_count, medium_repr)
 
711
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
 
712
                       count, vfs_count, medium_repr))
548
713
 
549
714
    def flush_all(self):
550
715
        for ref in list(self.counts.keys()):
551
716
            self.done(ref)
552
717
 
553
718
_debug_counter = None
 
719
_vfs_refuser = None
554
720
 
555
721
 
556
722
class SmartClientMedium(SmartMedium):
573
739
            if _debug_counter is None:
574
740
                _debug_counter = _DebugCounter()
575
741
            _debug_counter.track(self)
 
742
        if 'hpss_client_no_vfs' in debug.debug_flags:
 
743
            global _vfs_refuser
 
744
            if _vfs_refuser is None:
 
745
                _vfs_refuser = _VfsRefuser()
576
746
 
577
747
    def _is_remote_before(self, version_tuple):
578
748
        """Is it possible the remote side supports RPCs for a given version?
669
839
        """
670
840
        medium_base = urlutils.join(self.base, '/')
671
841
        rel_url = urlutils.relative_url(medium_base, transport.base)
672
 
        return urllib.unquote(rel_url)
 
842
        return urlutils.unquote(rel_url)
673
843
 
674
844
 
675
845
class SmartClientStreamMedium(SmartClientMedium):
710
880
        """
711
881
        return SmartClientStreamMediumRequest(self)
712
882
 
 
883
    def reset(self):
 
884
        """We have been disconnected, reset current state.
 
885
 
 
886
        This resets things like _current_request and connected state.
 
887
        """
 
888
        self.disconnect()
 
889
        self._current_request = None
 
890
 
713
891
 
714
892
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715
893
    """A client medium using simple pipes.
716
894
 
717
895
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
896
    """
723
897
 
724
898
    def __init__(self, readable_pipe, writeable_pipe, base):
728
902
 
729
903
    def _accept_bytes(self, bytes):
730
904
        """See SmartClientStreamMedium.accept_bytes."""
731
 
        self._writeable_pipe.write(bytes)
 
905
        try:
 
906
            self._writeable_pipe.write(bytes)
 
907
        except IOError, e:
 
908
            if e.errno in (errno.EINVAL, errno.EPIPE):
 
909
                raise errors.ConnectionReset(
 
910
                    "Error trying to write to subprocess:\n%s" % (e,))
 
911
            raise
732
912
        self._report_activity(len(bytes), 'write')
733
913
 
734
914
    def _flush(self):
735
915
        """See SmartClientStreamMedium._flush()."""
 
916
        # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
 
917
        #       However, testing shows that even when the child process is
 
918
        #       gone, this doesn't error.
736
919
        self._writeable_pipe.flush()
737
920
 
738
921
    def _read_bytes(self, count):
739
922
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
923
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
924
        bytes = self._readable_pipe.read(bytes_to_read)
741
925
        self._report_activity(len(bytes), 'read')
742
926
        return bytes
743
927
 
744
928
 
 
929
class SSHParams(object):
 
930
    """A set of parameters for starting a remote bzr via SSH."""
 
931
 
 
932
    def __init__(self, host, port=None, username=None, password=None,
 
933
            bzr_remote_path='bzr'):
 
934
        self.host = host
 
935
        self.port = port
 
936
        self.username = username
 
937
        self.password = password
 
938
        self.bzr_remote_path = bzr_remote_path
 
939
 
 
940
 
745
941
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
747
 
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
942
    """A client medium using SSH.
 
943
 
 
944
    It delegates IO to a SmartSimplePipesClientMedium or
 
945
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
946
    """
 
947
 
 
948
    def __init__(self, base, ssh_params, vendor=None):
750
949
        """Creates a client that will connect on the first use.
751
950
 
 
951
        :param ssh_params: A SSHParams instance.
752
952
        :param vendor: An optional override for the ssh vendor to use. See
753
953
            bzrlib.transport.ssh for details on ssh vendors.
754
954
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
955
        self._real_medium = None
 
956
        self._ssh_params = ssh_params
760
957
        # for the benefit of progress making a short description of this
761
958
        # transport
762
959
        self._scheme = 'bzr+ssh'
764
961
        # _DebugCounter so we have to store all the values used in our repr
765
962
        # method before calling the super init.
766
963
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
964
        self._vendor = vendor
768
965
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
966
 
773
967
    def __repr__(self):
774
 
        if self._port is None:
 
968
        if self._ssh_params.port is None:
775
969
            maybe_port = ''
776
970
        else:
777
 
            maybe_port = ':%s' % self._port
778
 
        return "%s(%s://%s@%s%s/)" % (
 
971
            maybe_port = ':%s' % self._ssh_params.port
 
972
        if self._ssh_params.username is None:
 
973
            maybe_user = ''
 
974
        else:
 
975
            maybe_user = '%s@' % self._ssh_params.username
 
976
        return "%s(%s://%s%s%s/)" % (
779
977
            self.__class__.__name__,
780
978
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
979
            maybe_user,
 
980
            self._ssh_params.host,
783
981
            maybe_port)
784
982
 
785
983
    def _accept_bytes(self, bytes):
786
984
        """See SmartClientStreamMedium.accept_bytes."""
787
985
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
986
        self._real_medium.accept_bytes(bytes)
790
987
 
791
988
    def disconnect(self):
792
989
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
990
        if self._real_medium is not None:
 
991
            self._real_medium.disconnect()
 
992
            self._real_medium = None
 
993
        if self._ssh_connection is not None:
 
994
            self._ssh_connection.close()
 
995
            self._ssh_connection = None
799
996
 
800
997
    def _ensure_connection(self):
801
998
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
999
        if self._real_medium is not None:
803
1000
            return
804
1001
        if self._vendor is None:
805
1002
            vendor = ssh._get_ssh_vendor()
806
1003
        else:
807
1004
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
1005
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
1006
                self._ssh_params.password, self._ssh_params.host,
 
1007
                self._ssh_params.port,
 
1008
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
1009
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
1010
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
1011
        if io_kind == 'socket':
 
1012
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
1013
                self.base, io_object)
 
1014
        elif io_kind == 'pipes':
 
1015
            read_from, write_to = io_object
 
1016
            self._real_medium = SmartSimplePipesClientMedium(
 
1017
                read_from, write_to, self.base)
 
1018
        else:
 
1019
            raise AssertionError(
 
1020
                "Unexpected io_kind %r from %r"
 
1021
                % (io_kind, self._ssh_connection))
815
1022
 
816
1023
    def _flush(self):
817
1024
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
1025
        self._real_medium._flush()
819
1026
 
820
1027
    def _read_bytes(self, count):
821
1028
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
1029
        if self._real_medium is None:
823
1030
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
1031
        return self._real_medium.read_bytes(count)
828
1032
 
829
1033
 
830
1034
# Port 4155 is the default port for bzr://, registered with IANA.
832
1036
BZR_DEFAULT_PORT = 4155
833
1037
 
834
1038
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
1039
class SmartClientSocketMedium(SmartClientStreamMedium):
 
1040
    """A client medium using a socket.
 
1041
    
 
1042
    This class isn't usable directly.  Use one of its subclasses instead.
 
1043
    """
837
1044
 
838
 
    def __init__(self, host, port, base):
839
 
        """Creates a client that will connect on the first use."""
 
1045
    def __init__(self, base):
840
1046
        SmartClientStreamMedium.__init__(self, base)
 
1047
        self._socket = None
841
1048
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
1049
 
846
1050
    def _accept_bytes(self, bytes):
847
1051
        """See SmartClientMedium.accept_bytes."""
848
1052
        self._ensure_connection()
849
1053
        osutils.send_all(self._socket, bytes, self._report_activity)
850
1054
 
 
1055
    def _ensure_connection(self):
 
1056
        """Connect this medium if not already connected."""
 
1057
        raise NotImplementedError(self._ensure_connection)
 
1058
 
 
1059
    def _flush(self):
 
1060
        """See SmartClientStreamMedium._flush().
 
1061
 
 
1062
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
1063
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
1064
        future.
 
1065
        """
 
1066
 
 
1067
    def _read_bytes(self, count):
 
1068
        """See SmartClientMedium.read_bytes."""
 
1069
        if not self._connected:
 
1070
            raise errors.MediumNotConnected(self)
 
1071
        return osutils.read_bytes_from_socket(
 
1072
            self._socket, self._report_activity)
 
1073
 
851
1074
    def disconnect(self):
852
1075
        """See SmartClientMedium.disconnect()."""
853
1076
        if not self._connected:
856
1079
        self._socket = None
857
1080
        self._connected = False
858
1081
 
 
1082
 
 
1083
class SmartTCPClientMedium(SmartClientSocketMedium):
 
1084
    """A client medium that creates a TCP connection."""
 
1085
 
 
1086
    def __init__(self, host, port, base):
 
1087
        """Creates a client that will connect on the first use."""
 
1088
        SmartClientSocketMedium.__init__(self, base)
 
1089
        self._host = host
 
1090
        self._port = port
 
1091
 
859
1092
    def _ensure_connection(self):
860
1093
        """Connect this medium if not already connected."""
861
1094
        if self._connected:
895
1128
                    (self._host, port, err_msg))
896
1129
        self._connected = True
897
1130
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
1131
 
 
1132
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
1133
    """A client medium for an already connected socket.
 
1134
    
 
1135
    Note that this class will assume it "owns" the socket, so it will close it
 
1136
    when its disconnect method is called.
 
1137
    """
 
1138
 
 
1139
    def __init__(self, base, sock):
 
1140
        SmartClientSocketMedium.__init__(self, base)
 
1141
        self._socket = sock
 
1142
        self._connected = True
 
1143
 
 
1144
    def _ensure_connection(self):
 
1145
        # Already connected, by definition!  So nothing to do.
 
1146
        pass
911
1147
 
912
1148
 
913
1149
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
948
1184
        This invokes self._medium._flush to ensure all bytes are transmitted.
949
1185
        """
950
1186
        self._medium._flush()
951
 
 
952