/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
28
import sys
30
 
import time
31
29
import urllib
32
30
 
33
 
import bzrlib
34
31
from bzrlib.lazy_import import lazy_import
35
32
lazy_import(globals(), """
36
 
import select
 
33
import atexit
37
34
import socket
38
35
import thread
39
36
import weakref
41
38
from bzrlib import (
42
39
    debug,
43
40
    errors,
 
41
    symbol_versioning,
44
42
    trace,
45
43
    ui,
46
44
    urlutils,
47
45
    )
48
 
from bzrlib.i18n import gettext
49
 
from bzrlib.smart import client, protocol, request, signals, vfs
 
46
from bzrlib.smart import client, protocol, request, vfs
50
47
from bzrlib.transport import ssh
51
48
""")
52
49
from bzrlib import osutils
179
176
        ui.ui_factory.report_transport_activity(self, bytes, direction)
180
177
 
181
178
 
182
 
_bad_file_descriptor = (errno.EBADF,)
183
 
if sys.platform == 'win32':
184
 
    # Given on Windows if you pass a closed socket to select.select. Probably
185
 
    # also given if you pass a file handle to select.
186
 
    WSAENOTSOCK = 10038
187
 
    _bad_file_descriptor += (WSAENOTSOCK,)
188
 
 
189
 
 
190
179
class SmartServerStreamMedium(SmartMedium):
191
180
    """Handles smart commands coming over a stream.
192
181
 
205
194
        the stream.  See also the _push_back method.
206
195
    """
207
196
 
208
 
    _timer = time.time
209
 
 
210
 
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
 
197
    def __init__(self, backing_transport, root_client_path='/'):
211
198
        """Construct new server.
212
199
 
213
200
        :param backing_transport: Transport for the directory served.
216
203
        self.backing_transport = backing_transport
217
204
        self.root_client_path = root_client_path
218
205
        self.finished = False
219
 
        if timeout is None:
220
 
            raise AssertionError('You must supply a timeout.')
221
 
        self._client_timeout = timeout
222
 
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
206
        SmartMedium.__init__(self)
224
207
 
225
208
    def serve(self):
231
214
            while not self.finished:
232
215
                server_protocol = self._build_protocol()
233
216
                self._serve_one_request(server_protocol)
234
 
        except errors.ConnectionTimeout, e:
235
 
            trace.note('%s' % (e,))
236
 
            trace.log_exception_quietly()
237
 
            self._disconnect_client()
238
 
            # We reported it, no reason to make a big fuss.
239
 
            return
240
217
        except Exception, e:
241
218
            stderr.write("%s terminating on exception %s\n" % (self, e))
242
219
            raise
243
 
        self._disconnect_client()
244
 
 
245
 
    def _stop_gracefully(self):
246
 
        """When we finish this message, stop looking for more."""
247
 
        trace.mutter('Stopping %s' % (self,))
248
 
        self.finished = True
249
 
 
250
 
    def _disconnect_client(self):
251
 
        """Close the current connection. We stopped due to a timeout/etc."""
252
 
        # The default implementation is a no-op, because that is all we used to
253
 
        # do when disconnecting from a client. I suppose we never had the
254
 
        # *server* initiate a disconnect, before
255
 
 
256
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
257
 
        """Wait for more bytes to be read, but timeout if none available.
258
 
 
259
 
        This allows us to detect idle connections, and stop trying to read from
260
 
        them, without setting the socket itself to non-blocking. This also
261
 
        allows us to specify when we watch for idle timeouts.
262
 
 
263
 
        :return: Did we timeout? (True if we timed out, False if there is data
264
 
            to be read)
265
 
        """
266
 
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
267
220
 
268
221
    def _build_protocol(self):
269
222
        """Identifies the version of the incoming request, and returns an
274
227
 
275
228
        :returns: a SmartServerRequestProtocol.
276
229
        """
277
 
        self._wait_for_bytes_with_timeout(self._client_timeout)
278
 
        if self.finished:
279
 
            # We're stopping, so don't try to do any more work
280
 
            return None
281
230
        bytes = self._get_line()
282
231
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
283
232
        protocol = protocol_factory(
285
234
        protocol.accept_bytes(unused_bytes)
286
235
        return protocol
287
236
 
288
 
    def _wait_on_descriptor(self, fd, timeout_seconds):
289
 
        """select() on a file descriptor, waiting for nonblocking read()
290
 
 
291
 
        This will raise a ConnectionTimeout exception if we do not get a
292
 
        readable handle before timeout_seconds.
293
 
        :return: None
294
 
        """
295
 
        t_end = self._timer() + timeout_seconds
296
 
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
297
 
        rs = xs = None
298
 
        while not rs and not xs and self._timer() < t_end:
299
 
            if self.finished:
300
 
                return
301
 
            try:
302
 
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
303
 
            except (select.error, socket.error) as e:
304
 
                err = getattr(e, 'errno', None)
305
 
                if err is None and getattr(e, 'args', None) is not None:
306
 
                    # select.error doesn't have 'errno', it just has args[0]
307
 
                    err = e.args[0]
308
 
                if err in _bad_file_descriptor:
309
 
                    return # Not a socket indicates read() will fail
310
 
                elif err == errno.EINTR:
311
 
                    # Interrupted, keep looping.
312
 
                    continue
313
 
                raise
314
 
        if rs or xs:
315
 
            return
316
 
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
317
 
                                       % (timeout_seconds,))
318
 
 
319
237
    def _serve_one_request(self, protocol):
320
238
        """Read one request from input, process, send back a response.
321
239
 
322
240
        :param protocol: a SmartServerRequestProtocol.
323
241
        """
324
 
        if protocol is None:
325
 
            return
326
242
        try:
327
243
            self._serve_one_request_unguarded(protocol)
328
244
        except KeyboardInterrupt:
344
260
 
345
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
346
262
 
347
 
    def __init__(self, sock, backing_transport, root_client_path='/',
348
 
                 timeout=None):
 
263
    def __init__(self, sock, backing_transport, root_client_path='/'):
349
264
        """Constructor.
350
265
 
351
266
        :param sock: the socket the server will read from.  It will be put
352
267
            into blocking mode.
353
268
        """
354
269
        SmartServerStreamMedium.__init__(
355
 
            self, backing_transport, root_client_path=root_client_path,
356
 
            timeout=timeout)
 
270
            self, backing_transport, root_client_path=root_client_path)
357
271
        sock.setblocking(True)
358
272
        self.socket = sock
359
 
        # Get the getpeername now, as we might be closed later when we care.
360
 
        try:
361
 
            self._client_info = sock.getpeername()
362
 
        except socket.error:
363
 
            self._client_info = '<unknown>'
364
 
 
365
 
    def __str__(self):
366
 
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
367
 
 
368
 
    def __repr__(self):
369
 
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
370
 
            self._client_info)
371
273
 
372
274
    def _serve_one_request_unguarded(self, protocol):
373
275
        while protocol.next_read_size():
382
284
 
383
285
        self._push_back(protocol.unused_data)
384
286
 
385
 
    def _disconnect_client(self):
386
 
        """Close the current connection. We stopped due to a timeout/etc."""
387
 
        self.socket.close()
388
 
 
389
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
390
 
        """Wait for more bytes to be read, but timeout if none available.
391
 
 
392
 
        This allows us to detect idle connections, and stop trying to read from
393
 
        them, without setting the socket itself to non-blocking. This also
394
 
        allows us to specify when we watch for idle timeouts.
395
 
 
396
 
        :return: None, this will raise ConnectionTimeout if we time out before
397
 
            data is available.
398
 
        """
399
 
        return self._wait_on_descriptor(self.socket, timeout_seconds)
400
 
 
401
287
    def _read_bytes(self, desired_count):
402
288
        return osutils.read_bytes_from_socket(
403
289
            self.socket, self._report_activity)
420
306
 
421
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
422
308
 
423
 
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
 
309
    def __init__(self, in_file, out_file, backing_transport):
424
310
        """Construct new server.
425
311
 
426
312
        :param in_file: Python file from which requests can be read.
427
313
        :param out_file: Python file to write responses.
428
314
        :param backing_transport: Transport for the directory served.
429
315
        """
430
 
        SmartServerStreamMedium.__init__(self, backing_transport,
431
 
            timeout=timeout)
 
316
        SmartServerStreamMedium.__init__(self, backing_transport)
432
317
        if sys.platform == 'win32':
433
318
            # force binary mode for files
434
319
            import msvcrt
439
324
        self._in = in_file
440
325
        self._out = out_file
441
326
 
442
 
    def serve(self):
443
 
        """See SmartServerStreamMedium.serve"""
444
 
        # This is the regular serve, except it adds signal trapping for soft
445
 
        # shutdown.
446
 
        stop_gracefully = self._stop_gracefully
447
 
        signals.register_on_hangup(id(self), stop_gracefully)
448
 
        try:
449
 
            return super(SmartServerPipeStreamMedium, self).serve()
450
 
        finally:
451
 
            signals.unregister_on_hangup(id(self))
452
 
 
453
327
    def _serve_one_request_unguarded(self, protocol):
454
328
        while True:
455
329
            # We need to be careful not to read past the end of the current
468
342
                return
469
343
            protocol.accept_bytes(bytes)
470
344
 
471
 
    def _disconnect_client(self):
472
 
        self._in.close()
473
 
        self._out.flush()
474
 
        self._out.close()
475
 
 
476
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
477
 
        """Wait for more bytes to be read, but timeout if none available.
478
 
 
479
 
        This allows us to detect idle connections, and stop trying to read from
480
 
        them, without setting the socket itself to non-blocking. This also
481
 
        allows us to specify when we watch for idle timeouts.
482
 
 
483
 
        :return: None, this will raise ConnectionTimeout if we time out before
484
 
            data is available.
485
 
        """
486
 
        if (getattr(self._in, 'fileno', None) is None
487
 
            or sys.platform == 'win32'):
488
 
            # You can't select() file descriptors on Windows.
489
 
            return
490
 
        return self._wait_on_descriptor(self._in, timeout_seconds)
491
 
 
492
345
    def _read_bytes(self, desired_count):
493
346
        return self._in.read(desired_count)
494
347
 
638
491
        return self._medium._get_line()
639
492
 
640
493
 
641
 
class _VfsRefuser(object):
642
 
    """An object that refuses all VFS requests.
643
 
 
644
 
    """
645
 
 
646
 
    def __init__(self):
647
 
        client._SmartClient.hooks.install_named_hook(
648
 
            'call', self.check_vfs, 'vfs refuser')
649
 
 
650
 
    def check_vfs(self, params):
651
 
        try:
652
 
            request_method = request.request_handlers.get(params.method)
653
 
        except KeyError:
654
 
            # A method we don't know about doesn't count as a VFS method.
655
 
            return
656
 
        if issubclass(request_method, vfs.VfsRequest):
657
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
658
 
 
659
 
 
660
494
class _DebugCounter(object):
661
495
    """An object that counts the HPSS calls made to each client medium.
662
496
 
663
 
    When a medium is garbage-collected, or failing that when
664
 
    bzrlib.global_state exits, the total number of calls made on that medium
665
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
666
500
    """
667
501
 
668
502
    def __init__(self):
669
503
        self.counts = weakref.WeakKeyDictionary()
670
504
        client._SmartClient.hooks.install_named_hook(
671
505
            'call', self.increment_call_count, 'hpss call counter')
672
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
673
507
 
674
508
    def track(self, medium):
675
509
        """Start tracking calls made to a medium.
709
543
        value['count'] = 0
710
544
        value['vfs_count'] = 0
711
545
        if count != 0:
712
 
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
713
 
                       count, vfs_count, medium_repr))
 
546
            trace.note('HPSS calls: %d (%d vfs) %s',
 
547
                       count, vfs_count, medium_repr)
714
548
 
715
549
    def flush_all(self):
716
550
        for ref in list(self.counts.keys()):
717
551
            self.done(ref)
718
552
 
719
553
_debug_counter = None
720
 
_vfs_refuser = None
721
554
 
722
555
 
723
556
class SmartClientMedium(SmartMedium):
740
573
            if _debug_counter is None:
741
574
                _debug_counter = _DebugCounter()
742
575
            _debug_counter.track(self)
743
 
        if 'hpss_client_no_vfs' in debug.debug_flags:
744
 
            global _vfs_refuser
745
 
            if _vfs_refuser is None:
746
 
                _vfs_refuser = _VfsRefuser()
747
576
 
748
577
    def _is_remote_before(self, version_tuple):
749
578
        """Is it possible the remote side supports RPCs for a given version?
881
710
        """
882
711
        return SmartClientStreamMediumRequest(self)
883
712
 
884
 
    def reset(self):
885
 
        """We have been disconnected, reset current state.
886
 
 
887
 
        This resets things like _current_request and connected state.
888
 
        """
889
 
        self.disconnect()
890
 
        self._current_request = None
891
 
 
892
713
 
893
714
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
894
715
    """A client medium using simple pipes.
895
716
 
896
717
    This client does not manage the pipes: it assumes they will always be open.
 
718
 
 
719
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
720
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
721
    (such as used for sys.stdin) are safe.
897
722
    """
898
723
 
899
724
    def __init__(self, readable_pipe, writeable_pipe, base):
903
728
 
904
729
    def _accept_bytes(self, bytes):
905
730
        """See SmartClientStreamMedium.accept_bytes."""
906
 
        try:
907
 
            self._writeable_pipe.write(bytes)
908
 
        except IOError, e:
909
 
            if e.errno in (errno.EINVAL, errno.EPIPE):
910
 
                raise errors.ConnectionReset(
911
 
                    "Error trying to write to subprocess:\n%s" % (e,))
912
 
            raise
 
731
        self._writeable_pipe.write(bytes)
913
732
        self._report_activity(len(bytes), 'write')
914
733
 
915
734
    def _flush(self):
916
735
        """See SmartClientStreamMedium._flush()."""
917
 
        # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
918
 
        #       However, testing shows that even when the child process is
919
 
        #       gone, this doesn't error.
920
736
        self._writeable_pipe.flush()
921
737
 
922
738
    def _read_bytes(self, count):
923
739
        """See SmartClientStreamMedium._read_bytes."""
924
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
925
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
740
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
926
741
        self._report_activity(len(bytes), 'read')
927
742
        return bytes
928
743
 
929
744
 
930
 
class SSHParams(object):
931
 
    """A set of parameters for starting a remote bzr via SSH."""
 
745
class SmartSSHClientMedium(SmartClientStreamMedium):
 
746
    """A client medium using SSH."""
932
747
 
933
748
    def __init__(self, host, port=None, username=None, password=None,
934
 
            bzr_remote_path='bzr'):
935
 
        self.host = host
936
 
        self.port = port
937
 
        self.username = username
938
 
        self.password = password
939
 
        self.bzr_remote_path = bzr_remote_path
940
 
 
941
 
 
942
 
class SmartSSHClientMedium(SmartClientStreamMedium):
943
 
    """A client medium using SSH.
944
 
 
945
 
    It delegates IO to a SmartSimplePipesClientMedium or
946
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
947
 
    """
948
 
 
949
 
    def __init__(self, base, ssh_params, vendor=None):
 
749
            base=None, vendor=None, bzr_remote_path=None):
950
750
        """Creates a client that will connect on the first use.
951
751
 
952
 
        :param ssh_params: A SSHParams instance.
953
752
        :param vendor: An optional override for the ssh vendor to use. See
954
753
            bzrlib.transport.ssh for details on ssh vendors.
955
754
        """
956
 
        self._real_medium = None
957
 
        self._ssh_params = ssh_params
 
755
        self._connected = False
 
756
        self._host = host
 
757
        self._password = password
 
758
        self._port = port
 
759
        self._username = username
958
760
        # for the benefit of progress making a short description of this
959
761
        # transport
960
762
        self._scheme = 'bzr+ssh'
962
764
        # _DebugCounter so we have to store all the values used in our repr
963
765
        # method before calling the super init.
964
766
        SmartClientStreamMedium.__init__(self, base)
 
767
        self._read_from = None
 
768
        self._ssh_connection = None
965
769
        self._vendor = vendor
966
 
        self._ssh_connection = None
 
770
        self._write_to = None
 
771
        self._bzr_remote_path = bzr_remote_path
967
772
 
968
773
    def __repr__(self):
969
 
        if self._ssh_params.port is None:
 
774
        if self._port is None:
970
775
            maybe_port = ''
971
776
        else:
972
 
            maybe_port = ':%s' % self._ssh_params.port
973
 
        if self._ssh_params.username is None:
974
 
            maybe_user = ''
975
 
        else:
976
 
            maybe_user = '%s@' % self._ssh_params.username
977
 
        return "%s(%s://%s%s%s/)" % (
 
777
            maybe_port = ':%s' % self._port
 
778
        return "%s(%s://%s@%s%s/)" % (
978
779
            self.__class__.__name__,
979
780
            self._scheme,
980
 
            maybe_user,
981
 
            self._ssh_params.host,
 
781
            self._username,
 
782
            self._host,
982
783
            maybe_port)
983
784
 
984
785
    def _accept_bytes(self, bytes):
985
786
        """See SmartClientStreamMedium.accept_bytes."""
986
787
        self._ensure_connection()
987
 
        self._real_medium.accept_bytes(bytes)
 
788
        self._write_to.write(bytes)
 
789
        self._report_activity(len(bytes), 'write')
988
790
 
989
791
    def disconnect(self):
990
792
        """See SmartClientMedium.disconnect()."""
991
 
        if self._real_medium is not None:
992
 
            self._real_medium.disconnect()
993
 
            self._real_medium = None
994
 
        if self._ssh_connection is not None:
995
 
            self._ssh_connection.close()
996
 
            self._ssh_connection = None
 
793
        if not self._connected:
 
794
            return
 
795
        self._read_from.close()
 
796
        self._write_to.close()
 
797
        self._ssh_connection.close()
 
798
        self._connected = False
997
799
 
998
800
    def _ensure_connection(self):
999
801
        """Connect this medium if not already connected."""
1000
 
        if self._real_medium is not None:
 
802
        if self._connected:
1001
803
            return
1002
804
        if self._vendor is None:
1003
805
            vendor = ssh._get_ssh_vendor()
1004
806
        else:
1005
807
            vendor = self._vendor
1006
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1007
 
                self._ssh_params.password, self._ssh_params.host,
1008
 
                self._ssh_params.port,
1009
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
808
        self._ssh_connection = vendor.connect_ssh(self._username,
 
809
                self._password, self._host, self._port,
 
810
                command=[self._bzr_remote_path, 'serve', '--inet',
1010
811
                         '--directory=/', '--allow-writes'])
1011
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1012
 
        if io_kind == 'socket':
1013
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1014
 
                self.base, io_object)
1015
 
        elif io_kind == 'pipes':
1016
 
            read_from, write_to = io_object
1017
 
            self._real_medium = SmartSimplePipesClientMedium(
1018
 
                read_from, write_to, self.base)
1019
 
        else:
1020
 
            raise AssertionError(
1021
 
                "Unexpected io_kind %r from %r"
1022
 
                % (io_kind, self._ssh_connection))
 
812
        self._read_from, self._write_to = \
 
813
            self._ssh_connection.get_filelike_channels()
 
814
        self._connected = True
1023
815
 
1024
816
    def _flush(self):
1025
817
        """See SmartClientStreamMedium._flush()."""
1026
 
        self._real_medium._flush()
 
818
        self._write_to.flush()
1027
819
 
1028
820
    def _read_bytes(self, count):
1029
821
        """See SmartClientStreamMedium.read_bytes."""
1030
 
        if self._real_medium is None:
 
822
        if not self._connected:
1031
823
            raise errors.MediumNotConnected(self)
1032
 
        return self._real_medium.read_bytes(count)
 
824
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
825
        bytes = self._read_from.read(bytes_to_read)
 
826
        self._report_activity(len(bytes), 'read')
 
827
        return bytes
1033
828
 
1034
829
 
1035
830
# Port 4155 is the default port for bzr://, registered with IANA.
1037
832
BZR_DEFAULT_PORT = 4155
1038
833
 
1039
834
 
1040
 
class SmartClientSocketMedium(SmartClientStreamMedium):
1041
 
    """A client medium using a socket.
1042
 
    
1043
 
    This class isn't usable directly.  Use one of its subclasses instead.
1044
 
    """
 
835
class SmartTCPClientMedium(SmartClientStreamMedium):
 
836
    """A client medium using TCP."""
1045
837
 
1046
 
    def __init__(self, base):
 
838
    def __init__(self, host, port, base):
 
839
        """Creates a client that will connect on the first use."""
1047
840
        SmartClientStreamMedium.__init__(self, base)
 
841
        self._connected = False
 
842
        self._host = host
 
843
        self._port = port
1048
844
        self._socket = None
1049
 
        self._connected = False
1050
845
 
1051
846
    def _accept_bytes(self, bytes):
1052
847
        """See SmartClientMedium.accept_bytes."""
1053
848
        self._ensure_connection()
1054
849
        osutils.send_all(self._socket, bytes, self._report_activity)
1055
850
 
1056
 
    def _ensure_connection(self):
1057
 
        """Connect this medium if not already connected."""
1058
 
        raise NotImplementedError(self._ensure_connection)
1059
 
 
1060
 
    def _flush(self):
1061
 
        """See SmartClientStreamMedium._flush().
1062
 
 
1063
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
1064
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
1065
 
        future.
1066
 
        """
1067
 
 
1068
 
    def _read_bytes(self, count):
1069
 
        """See SmartClientMedium.read_bytes."""
1070
 
        if not self._connected:
1071
 
            raise errors.MediumNotConnected(self)
1072
 
        return osutils.read_bytes_from_socket(
1073
 
            self._socket, self._report_activity)
1074
 
 
1075
851
    def disconnect(self):
1076
852
        """See SmartClientMedium.disconnect()."""
1077
853
        if not self._connected:
1080
856
        self._socket = None
1081
857
        self._connected = False
1082
858
 
1083
 
 
1084
 
class SmartTCPClientMedium(SmartClientSocketMedium):
1085
 
    """A client medium that creates a TCP connection."""
1086
 
 
1087
 
    def __init__(self, host, port, base):
1088
 
        """Creates a client that will connect on the first use."""
1089
 
        SmartClientSocketMedium.__init__(self, base)
1090
 
        self._host = host
1091
 
        self._port = port
1092
 
 
1093
859
    def _ensure_connection(self):
1094
860
        """Connect this medium if not already connected."""
1095
861
        if self._connected:
1129
895
                    (self._host, port, err_msg))
1130
896
        self._connected = True
1131
897
 
1132
 
 
1133
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1134
 
    """A client medium for an already connected socket.
1135
 
    
1136
 
    Note that this class will assume it "owns" the socket, so it will close it
1137
 
    when its disconnect method is called.
1138
 
    """
1139
 
 
1140
 
    def __init__(self, base, sock):
1141
 
        SmartClientSocketMedium.__init__(self, base)
1142
 
        self._socket = sock
1143
 
        self._connected = True
1144
 
 
1145
 
    def _ensure_connection(self):
1146
 
        # Already connected, by definition!  So nothing to do.
1147
 
        pass
 
898
    def _flush(self):
 
899
        """See SmartClientStreamMedium._flush().
 
900
 
 
901
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
902
        add a means to do a flush, but that can be done in the future.
 
903
        """
 
904
 
 
905
    def _read_bytes(self, count):
 
906
        """See SmartClientMedium.read_bytes."""
 
907
        if not self._connected:
 
908
            raise errors.MediumNotConnected(self)
 
909
        return osutils.read_bytes_from_socket(
 
910
            self._socket, self._report_activity)
1148
911
 
1149
912
 
1150
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1185
948
        This invokes self._medium._flush to ensure all bytes are transmitted.
1186
949
        """
1187
950
        self._medium._flush()
 
951
 
 
952