478
455
if not line.endswith('\n'):
479
456
# end of file encountered reading from server
480
457
raise errors.ConnectionReset(
481
"Unexpected end of message. Please check connectivity "
482
"and permissions, and report a bug if problems persist.")
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
485
462
def _read_line(self):
486
463
"""Helper for SmartClientMediumRequest.read_line.
488
465
By default this forwards to self._medium._get_line because we are
489
466
operating on the medium's stream.
491
468
return self._medium._get_line()
494
class _DebugCounter(object):
495
"""An object that counts the HPSS calls made to each client medium.
497
When a medium is garbage-collected, or failing that when
498
bzrlib.global_state exits, the total number of calls made on that medium
499
are reported via trace.note.
503
self.counts = weakref.WeakKeyDictionary()
504
client._SmartClient.hooks.install_named_hook(
505
'call', self.increment_call_count, 'hpss call counter')
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
508
def track(self, medium):
509
"""Start tracking calls made to a medium.
511
This only keeps a weakref to the medium, so shouldn't affect the
514
medium_repr = repr(medium)
515
# Add this medium to the WeakKeyDictionary
516
self.counts[medium] = dict(count=0, vfs_count=0,
517
medium_repr=medium_repr)
518
# Weakref callbacks are fired in reverse order of their association
519
# with the referenced object. So we add a weakref *after* adding to
520
# the WeakKeyDict so that we can report the value from it before the
521
# entry is removed by the WeakKeyDict's own callback.
522
ref = weakref.ref(medium, self.done)
524
def increment_call_count(self, params):
525
# Increment the count in the WeakKeyDictionary
526
value = self.counts[params.medium]
529
request_method = request.request_handlers.get(params.method)
531
# A method we don't know about doesn't count as a VFS method.
533
if issubclass(request_method, vfs.VfsRequest):
534
value['vfs_count'] += 1
537
value = self.counts[ref]
538
count, vfs_count, medium_repr = (
539
value['count'], value['vfs_count'], value['medium_repr'])
540
# In case this callback is invoked for the same ref twice (by the
541
# weakref callback and by the atexit function), set the call count back
542
# to 0 so this item won't be reported twice.
544
value['vfs_count'] = 0
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
550
for ref in list(self.counts.keys()):
553
_debug_counter = None
556
471
class SmartClientMedium(SmartMedium):
557
472
"""Smart client is a medium for sending smart protocol requests over."""
734
632
def _read_bytes(self, count):
735
633
"""See SmartClientStreamMedium._read_bytes."""
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
738
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
743
"""A set of parameters for starting a remote bzr via SSH."""
634
return self._readable_pipe.read(count)
637
class SmartSSHClientMedium(SmartClientStreamMedium):
638
"""A client medium using SSH."""
745
640
def __init__(self, host, port=None, username=None, password=None,
746
bzr_remote_path='bzr'):
749
self.username = username
750
self.password = password
751
self.bzr_remote_path = bzr_remote_path
754
class SmartSSHClientMedium(SmartClientStreamMedium):
755
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
761
def __init__(self, base, ssh_params, vendor=None):
641
base=None, vendor=None, bzr_remote_path=None):
762
642
"""Creates a client that will connect on the first use.
764
:param ssh_params: A SSHParams instance.
765
644
:param vendor: An optional override for the ssh vendor to use. See
766
645
bzrlib.transport.ssh for details on ssh vendors.
768
self._real_medium = None
769
self._ssh_params = ssh_params
770
# for the benefit of progress making a short description of this
772
self._scheme = 'bzr+ssh'
773
# SmartClientStreamMedium stores the repr of this object in its
774
# _DebugCounter so we have to store all the values used in our repr
775
# method before calling the super init.
776
647
SmartClientStreamMedium.__init__(self, base)
648
self._connected = False
650
self._password = password
652
self._username = username
653
self._read_from = None
654
self._ssh_connection = None
777
655
self._vendor = vendor
778
self._ssh_connection = None
781
if self._ssh_params.port is None:
784
maybe_port = ':%s' % self._ssh_params.port
785
return "%s(%s://%s@%s%s/)" % (
786
self.__class__.__name__,
788
self._ssh_params.username,
789
self._ssh_params.host,
656
self._write_to = None
657
self._bzr_remote_path = bzr_remote_path
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
792
664
def _accept_bytes(self, bytes):
793
665
"""See SmartClientStreamMedium.accept_bytes."""
794
666
self._ensure_connection()
795
self._real_medium.accept_bytes(bytes)
667
self._write_to.write(bytes)
797
669
def disconnect(self):
798
670
"""See SmartClientMedium.disconnect()."""
799
if self._real_medium is not None:
800
self._real_medium.disconnect()
801
self._real_medium = None
802
if self._ssh_connection is not None:
803
self._ssh_connection.close()
804
self._ssh_connection = None
671
if not self._connected:
673
self._read_from.close()
674
self._write_to.close()
675
self._ssh_connection.close()
676
self._connected = False
806
678
def _ensure_connection(self):
807
679
"""Connect this medium if not already connected."""
808
if self._real_medium is not None:
810
682
if self._vendor is None:
811
683
vendor = ssh._get_ssh_vendor()
813
685
vendor = self._vendor
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
686
self._ssh_connection = vendor.connect_ssh(self._username,
687
self._password, self._host, self._port,
688
command=[self._bzr_remote_path, 'serve', '--inet',
818
689
'--directory=/', '--allow-writes'])
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
if io_kind == 'socket':
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
self.base, io_object)
823
elif io_kind == 'pipes':
824
read_from, write_to = io_object
825
self._real_medium = SmartSimplePipesClientMedium(
826
read_from, write_to, self.base)
828
raise AssertionError(
829
"Unexpected io_kind %r from %r"
830
% (io_kind, self._ssh_connection))
690
self._read_from, self._write_to = \
691
self._ssh_connection.get_filelike_channels()
692
self._connected = True
832
694
def _flush(self):
833
695
"""See SmartClientStreamMedium._flush()."""
834
self._real_medium._flush()
696
self._write_to.flush()
836
698
def _read_bytes(self, count):
837
699
"""See SmartClientStreamMedium.read_bytes."""
838
if self._real_medium is None:
700
if not self._connected:
839
701
raise errors.MediumNotConnected(self)
840
return self._real_medium.read_bytes(count)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
843
706
# Port 4155 is the default port for bzr://, registered with IANA.
844
BZR_DEFAULT_INTERFACE = None
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
845
708
BZR_DEFAULT_PORT = 4155
848
class SmartClientSocketMedium(SmartClientStreamMedium):
849
"""A client medium using a socket.
711
class SmartTCPClientMedium(SmartClientStreamMedium):
712
"""A client medium using TCP."""
851
This class isn't usable directly. Use one of its subclasses instead.
854
def __init__(self, base):
714
def __init__(self, host, port, base):
715
"""Creates a client that will connect on the first use."""
855
716
SmartClientStreamMedium.__init__(self, base)
717
self._connected = False
856
720
self._socket = None
857
self._connected = False
859
722
def _accept_bytes(self, bytes):
860
723
"""See SmartClientMedium.accept_bytes."""
861
724
self._ensure_connection()
862
osutils.send_all(self._socket, bytes, self._report_activity)
864
def _ensure_connection(self):
865
"""Connect this medium if not already connected."""
866
raise NotImplementedError(self._ensure_connection)
869
"""See SmartClientStreamMedium._flush().
871
For sockets we do no flushing. For TCP sockets we may want to turn off
872
TCP_NODELAY and add a means to do a flush, but that can be done in the
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
return osutils.read_bytes_from_socket(
881
self._socket, self._report_activity)
725
osutils.send_all(self._socket, bytes)
883
727
def disconnect(self):
884
728
"""See SmartClientMedium.disconnect()."""
888
732
self._socket = None
889
733
self._connected = False
892
class SmartTCPClientMedium(SmartClientSocketMedium):
893
"""A client medium that creates a TCP connection."""
895
def __init__(self, host, port, base):
896
"""Creates a client that will connect on the first use."""
897
SmartClientSocketMedium.__init__(self, base)
901
735
def _ensure_connection(self):
902
736
"""Connect this medium if not already connected."""
903
737
if self._connected:
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
905
741
if self._port is None:
906
742
port = BZR_DEFAULT_PORT
908
744
port = int(self._port)
910
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
911
socket.SOCK_STREAM, 0, 0)
912
except socket.gaierror, (err_num, err_msg):
913
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
914
(self._host, port, err_msg))
915
# Initialize err in case there are no addresses returned:
916
err = socket.error("no address found for %s" % self._host)
917
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
919
self._socket = socket.socket(family, socktype, proto)
920
self._socket.setsockopt(socket.IPPROTO_TCP,
921
socket.TCP_NODELAY, 1)
922
self._socket.connect(sockaddr)
923
except socket.error, err:
924
if self._socket is not None:
929
if self._socket is None:
746
self._socket.connect((self._host, port))
747
except socket.error, err:
930
748
# socket errors either have a (string) or (errno, string) as their
932
750
if type(err.args) is str:
937
755
(self._host, port, err_msg))
938
756
self._connected = True
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
"""A client medium for an already connected socket.
944
Note that this class will assume it "owns" the socket, so it will close it
945
when its disconnect method is called.
948
def __init__(self, base, sock):
949
SmartClientSocketMedium.__init__(self, base)
951
self._connected = True
953
def _ensure_connection(self):
954
# Already connected, by definition! So nothing to do.
759
"""See SmartClientStreamMedium._flush().
761
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
762
add a means to do a flush, but that can be done in the future.
765
def _read_bytes(self, count):
766
"""See SmartClientMedium.read_bytes."""
767
if not self._connected:
768
raise errors.MediumNotConnected(self)
769
# We ignore the desired_count because on sockets it's more efficient to
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
771
return self._socket.recv(_MAX_READ_SIZE)
958
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):