490
491
return self._medium._get_line()
493
class _VfsRefuser(object):
494
"""An object that refuses all VFS requests.
499
client._SmartClient.hooks.install_named_hook(
500
'call', self.check_vfs, 'vfs refuser')
502
def check_vfs(self, params):
504
request_method = request.request_handlers.get(params.method)
506
# A method we don't know about doesn't count as a VFS method.
508
if issubclass(request_method, vfs.VfsRequest):
509
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
512
494
class _DebugCounter(object):
513
495
"""An object that counts the HPSS calls made to each client medium.
515
When a medium is garbage-collected, or failing that when
516
bzrlib.global_state exits, the total number of calls made on that medium
517
are reported via trace.note.
497
When a medium is garbage-collected, or failing that when atexit functions
498
are run, the total number of calls made on that medium are reported via
520
502
def __init__(self):
521
503
self.counts = weakref.WeakKeyDictionary()
522
504
client._SmartClient.hooks.install_named_hook(
523
505
'call', self.increment_call_count, 'hpss call counter')
524
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
506
atexit.register(self.flush_all)
526
508
def track(self, medium):
527
509
"""Start tracking calls made to a medium.
757
738
def _read_bytes(self, count):
758
739
"""See SmartClientStreamMedium._read_bytes."""
759
bytes_to_read = min(count, _MAX_READ_SIZE)
760
bytes = self._readable_pipe.read(bytes_to_read)
740
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
761
741
self._report_activity(len(bytes), 'read')
765
class SSHParams(object):
766
"""A set of parameters for starting a remote bzr via SSH."""
745
class SmartSSHClientMedium(SmartClientStreamMedium):
746
"""A client medium using SSH."""
768
748
def __init__(self, host, port=None, username=None, password=None,
769
bzr_remote_path='bzr'):
772
self.username = username
773
self.password = password
774
self.bzr_remote_path = bzr_remote_path
777
class SmartSSHClientMedium(SmartClientStreamMedium):
778
"""A client medium using SSH.
780
It delegates IO to a SmartClientSocketMedium or
781
SmartClientAlreadyConnectedSocketMedium (depending on platform).
784
def __init__(self, base, ssh_params, vendor=None):
749
base=None, vendor=None, bzr_remote_path=None):
785
750
"""Creates a client that will connect on the first use.
787
:param ssh_params: A SSHParams instance.
788
752
:param vendor: An optional override for the ssh vendor to use. See
789
753
bzrlib.transport.ssh for details on ssh vendors.
791
self._real_medium = None
792
self._ssh_params = ssh_params
755
self._connected = False
757
self._password = password
759
self._username = username
793
760
# for the benefit of progress making a short description of this
795
762
self._scheme = 'bzr+ssh'
797
764
# _DebugCounter so we have to store all the values used in our repr
798
765
# method before calling the super init.
799
766
SmartClientStreamMedium.__init__(self, base)
767
self._read_from = None
768
self._ssh_connection = None
800
769
self._vendor = vendor
801
self._ssh_connection = None
770
self._write_to = None
771
self._bzr_remote_path = bzr_remote_path
803
773
def __repr__(self):
804
if self._ssh_params.port is None:
774
if self._port is None:
807
maybe_port = ':%s' % self._ssh_params.port
777
maybe_port = ':%s' % self._port
808
778
return "%s(%s://%s@%s%s/)" % (
809
779
self.__class__.__name__,
811
self._ssh_params.username,
812
self._ssh_params.host,
815
785
def _accept_bytes(self, bytes):
816
786
"""See SmartClientStreamMedium.accept_bytes."""
817
787
self._ensure_connection()
818
self._real_medium.accept_bytes(bytes)
788
self._write_to.write(bytes)
789
self._report_activity(len(bytes), 'write')
820
791
def disconnect(self):
821
792
"""See SmartClientMedium.disconnect()."""
822
if self._real_medium is not None:
823
self._real_medium.disconnect()
824
self._real_medium = None
825
if self._ssh_connection is not None:
826
self._ssh_connection.close()
827
self._ssh_connection = None
793
if not self._connected:
795
self._read_from.close()
796
self._write_to.close()
797
self._ssh_connection.close()
798
self._connected = False
829
800
def _ensure_connection(self):
830
801
"""Connect this medium if not already connected."""
831
if self._real_medium is not None:
833
804
if self._vendor is None:
834
805
vendor = ssh._get_ssh_vendor()
836
807
vendor = self._vendor
837
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
838
self._ssh_params.password, self._ssh_params.host,
839
self._ssh_params.port,
840
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
808
self._ssh_connection = vendor.connect_ssh(self._username,
809
self._password, self._host, self._port,
810
command=[self._bzr_remote_path, 'serve', '--inet',
841
811
'--directory=/', '--allow-writes'])
842
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
843
if io_kind == 'socket':
844
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
845
self.base, io_object)
846
elif io_kind == 'pipes':
847
read_from, write_to = io_object
848
self._real_medium = SmartSimplePipesClientMedium(
849
read_from, write_to, self.base)
851
raise AssertionError(
852
"Unexpected io_kind %r from %r"
853
% (io_kind, self._ssh_connection))
812
self._read_from, self._write_to = \
813
self._ssh_connection.get_filelike_channels()
814
self._connected = True
855
816
def _flush(self):
856
817
"""See SmartClientStreamMedium._flush()."""
857
self._real_medium._flush()
818
self._write_to.flush()
859
820
def _read_bytes(self, count):
860
821
"""See SmartClientStreamMedium.read_bytes."""
861
if self._real_medium is None:
822
if not self._connected:
862
823
raise errors.MediumNotConnected(self)
863
return self._real_medium.read_bytes(count)
824
bytes_to_read = min(count, _MAX_READ_SIZE)
825
bytes = self._read_from.read(bytes_to_read)
826
self._report_activity(len(bytes), 'read')
866
830
# Port 4155 is the default port for bzr://, registered with IANA.
868
832
BZR_DEFAULT_PORT = 4155
871
class SmartClientSocketMedium(SmartClientStreamMedium):
872
"""A client medium using a socket.
874
This class isn't usable directly. Use one of its subclasses instead.
835
class SmartTCPClientMedium(SmartClientStreamMedium):
836
"""A client medium using TCP."""
877
def __init__(self, base):
838
def __init__(self, host, port, base):
839
"""Creates a client that will connect on the first use."""
878
840
SmartClientStreamMedium.__init__(self, base)
841
self._connected = False
879
844
self._socket = None
880
self._connected = False
882
846
def _accept_bytes(self, bytes):
883
847
"""See SmartClientMedium.accept_bytes."""
884
848
self._ensure_connection()
885
849
osutils.send_all(self._socket, bytes, self._report_activity)
887
def _ensure_connection(self):
888
"""Connect this medium if not already connected."""
889
raise NotImplementedError(self._ensure_connection)
892
"""See SmartClientStreamMedium._flush().
894
For sockets we do no flushing. For TCP sockets we may want to turn off
895
TCP_NODELAY and add a means to do a flush, but that can be done in the
899
def _read_bytes(self, count):
900
"""See SmartClientMedium.read_bytes."""
901
if not self._connected:
902
raise errors.MediumNotConnected(self)
903
return osutils.read_bytes_from_socket(
904
self._socket, self._report_activity)
906
851
def disconnect(self):
907
852
"""See SmartClientMedium.disconnect()."""
908
853
if not self._connected:
960
895
(self._host, port, err_msg))
961
896
self._connected = True
964
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
965
"""A client medium for an already connected socket.
967
Note that this class will assume it "owns" the socket, so it will close it
968
when its disconnect method is called.
971
def __init__(self, base, sock):
972
SmartClientSocketMedium.__init__(self, base)
974
self._connected = True
976
def _ensure_connection(self):
977
# Already connected, by definition! So nothing to do.
899
"""See SmartClientStreamMedium._flush().
901
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
add a means to do a flush, but that can be done in the future.
905
def _read_bytes(self, count):
906
"""See SmartClientMedium.read_bytes."""
907
if not self._connected:
908
raise errors.MediumNotConnected(self)
909
return osutils.read_bytes_from_socket(
910
self._socket, self._report_activity)
981
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):