22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
breezy/transport/smart/__init__.py.
24
bzrlib/transport/smart/__init__.py.
27
from __future__ import absolute_import
35
from ..lazy_import import lazy_import
31
from bzrlib.lazy_import import lazy_import
36
32
lazy_import(globals(), """
50
from breezy.i18n import gettext
51
from breezy.smart import client, protocol, request, signals, vfs
52
from breezy.transport import ssh
46
from bzrlib.smart import client, protocol, request, vfs
47
from bzrlib.transport import ssh
54
from .. import osutils
49
from bzrlib import osutils
56
51
# Throughout this module buffer size parameters are either limited to be at
57
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
233
214
while not self.finished:
234
215
server_protocol = self._build_protocol()
235
216
self._serve_one_request(server_protocol)
236
except errors.ConnectionTimeout as e:
237
trace.note('%s' % (e,))
238
trace.log_exception_quietly()
239
self._disconnect_client()
240
# We reported it, no reason to make a big fuss.
242
except Exception as e:
243
218
stderr.write("%s terminating on exception %s\n" % (self, e))
245
self._disconnect_client()
247
def _stop_gracefully(self):
248
"""When we finish this message, stop looking for more."""
249
trace.mutter('Stopping %s' % (self,))
252
def _disconnect_client(self):
253
"""Close the current connection. We stopped due to a timeout/etc."""
254
# The default implementation is a no-op, because that is all we used to
255
# do when disconnecting from a client. I suppose we never had the
256
# *server* initiate a disconnect, before
258
def _wait_for_bytes_with_timeout(self, timeout_seconds):
259
"""Wait for more bytes to be read, but timeout if none available.
261
This allows us to detect idle connections, and stop trying to read from
262
them, without setting the socket itself to non-blocking. This also
263
allows us to specify when we watch for idle timeouts.
265
:return: Did we timeout? (True if we timed out, False if there is data
268
raise NotImplementedError(self._wait_for_bytes_with_timeout)
270
221
def _build_protocol(self):
271
222
"""Identifies the version of the incoming request, and returns an
287
234
protocol.accept_bytes(unused_bytes)
290
def _wait_on_descriptor(self, fd, timeout_seconds):
291
"""select() on a file descriptor, waiting for nonblocking read()
293
This will raise a ConnectionTimeout exception if we do not get a
294
readable handle before timeout_seconds.
297
t_end = self._timer() + timeout_seconds
298
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
300
while not rs and not xs and self._timer() < t_end:
304
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
305
except (select.error, socket.error) as e:
306
err = getattr(e, 'errno', None)
307
if err is None and getattr(e, 'args', None) is not None:
308
# select.error doesn't have 'errno', it just has args[0]
310
if err in _bad_file_descriptor:
311
return # Not a socket indicates read() will fail
312
elif err == errno.EINTR:
313
# Interrupted, keep looping.
318
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
319
% (timeout_seconds,))
321
237
def _serve_one_request(self, protocol):
322
238
"""Read one request from input, process, send back a response.
324
240
:param protocol: a SmartServerRequestProtocol.
329
243
self._serve_one_request_unguarded(protocol)
330
244
except KeyboardInterrupt:
332
except Exception as e:
333
247
self.terminate_due_to_error()
335
249
def terminate_due_to_error(self):
347
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
349
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
353
266
:param sock: the socket the server will read from. It will be put
354
267
into blocking mode.
356
269
SmartServerStreamMedium.__init__(
357
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
359
271
sock.setblocking(True)
360
272
self.socket = sock
361
# Get the getpeername now, as we might be closed later when we care.
363
self._client_info = sock.getpeername()
365
self._client_info = '<unknown>'
368
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
371
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
374
274
def _serve_one_request_unguarded(self, protocol):
375
275
while protocol.next_read_size():
385
285
self._push_back(protocol.unused_data)
387
def _disconnect_client(self):
388
"""Close the current connection. We stopped due to a timeout/etc."""
391
def _wait_for_bytes_with_timeout(self, timeout_seconds):
392
"""Wait for more bytes to be read, but timeout if none available.
394
This allows us to detect idle connections, and stop trying to read from
395
them, without setting the socket itself to non-blocking. This also
396
allows us to specify when we watch for idle timeouts.
398
:return: None, this will raise ConnectionTimeout if we time out before
401
return self._wait_on_descriptor(self.socket, timeout_seconds)
403
287
def _read_bytes(self, desired_count):
404
288
return osutils.read_bytes_from_socket(
405
289
self.socket, self._report_activity)
423
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
425
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
426
310
"""Construct new server.
428
312
:param in_file: Python file from which requests can be read.
429
313
:param out_file: Python file to write responses.
430
314
:param backing_transport: Transport for the directory served.
432
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
434
317
if sys.platform == 'win32':
435
318
# force binary mode for files
471
343
protocol.accept_bytes(bytes)
473
def _disconnect_client(self):
478
def _wait_for_bytes_with_timeout(self, timeout_seconds):
479
"""Wait for more bytes to be read, but timeout if none available.
481
This allows us to detect idle connections, and stop trying to read from
482
them, without setting the socket itself to non-blocking. This also
483
allows us to specify when we watch for idle timeouts.
485
:return: None, this will raise ConnectionTimeout if we time out before
488
if (getattr(self._in, 'fileno', None) is None
489
or sys.platform == 'win32'):
490
# You can't select() file descriptors on Windows.
492
return self._wait_on_descriptor(self._in, timeout_seconds)
494
345
def _read_bytes(self, desired_count):
495
346
return self._in.read(desired_count)
640
491
return self._medium._get_line()
643
class _VfsRefuser(object):
644
"""An object that refuses all VFS requests.
649
client._SmartClient.hooks.install_named_hook(
650
'call', self.check_vfs, 'vfs refuser')
652
def check_vfs(self, params):
654
request_method = request.request_handlers.get(params.method)
656
# A method we don't know about doesn't count as a VFS method.
658
if issubclass(request_method, vfs.VfsRequest):
659
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
662
494
class _DebugCounter(object):
663
495
"""An object that counts the HPSS calls made to each client medium.
665
When a medium is garbage-collected, or failing that when
666
breezy.global_state exits, the total number of calls made on that medium
667
are reported via trace.note.
497
When a medium is garbage-collected, or failing that when atexit functions
498
are run, the total number of calls made on that medium are reported via
670
502
def __init__(self):
671
503
self.counts = weakref.WeakKeyDictionary()
672
504
client._SmartClient.hooks.install_named_hook(
673
505
'call', self.increment_call_count, 'hpss call counter')
674
breezy.global_state.cleanups.add_cleanup(self.flush_all)
506
atexit.register(self.flush_all)
676
508
def track(self, medium):
677
509
"""Start tracking calls made to a medium.
884
711
return SmartClientStreamMediumRequest(self)
887
"""We have been disconnected, reset current state.
889
This resets things like _current_request and connected state.
892
self._current_request = None
895
714
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
896
715
"""A client medium using simple pipes.
898
717
This client does not manage the pipes: it assumes they will always be open.
719
Note that if readable_pipe.read might raise IOError or OSError with errno
720
of EINTR, it must be safe to retry the read. Plain CPython fileobjects
721
(such as used for sys.stdin) are safe.
901
724
def __init__(self, readable_pipe, writeable_pipe, base):
906
729
def _accept_bytes(self, bytes):
907
730
"""See SmartClientStreamMedium.accept_bytes."""
909
self._writeable_pipe.write(bytes)
911
if e.errno in (errno.EINVAL, errno.EPIPE):
912
raise errors.ConnectionReset(
913
"Error trying to write to subprocess", e)
731
self._writeable_pipe.write(bytes)
915
732
self._report_activity(len(bytes), 'write')
917
734
def _flush(self):
918
735
"""See SmartClientStreamMedium._flush()."""
919
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
920
# However, testing shows that even when the child process is
921
# gone, this doesn't error.
922
736
self._writeable_pipe.flush()
924
738
def _read_bytes(self, count):
925
739
"""See SmartClientStreamMedium._read_bytes."""
926
bytes_to_read = min(count, _MAX_READ_SIZE)
927
bytes = self._readable_pipe.read(bytes_to_read)
740
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
928
741
self._report_activity(len(bytes), 'read')
932
class SSHParams(object):
933
"""A set of parameters for starting a remote bzr via SSH."""
745
class SmartSSHClientMedium(SmartClientStreamMedium):
746
"""A client medium using SSH."""
935
748
def __init__(self, host, port=None, username=None, password=None,
936
bzr_remote_path='bzr'):
939
self.username = username
940
self.password = password
941
self.bzr_remote_path = bzr_remote_path
944
class SmartSSHClientMedium(SmartClientStreamMedium):
945
"""A client medium using SSH.
947
It delegates IO to a SmartSimplePipesClientMedium or
948
SmartClientAlreadyConnectedSocketMedium (depending on platform).
951
def __init__(self, base, ssh_params, vendor=None):
749
base=None, vendor=None, bzr_remote_path=None):
952
750
"""Creates a client that will connect on the first use.
954
:param ssh_params: A SSHParams instance.
955
752
:param vendor: An optional override for the ssh vendor to use. See
956
breezy.transport.ssh for details on ssh vendors.
753
bzrlib.transport.ssh for details on ssh vendors.
958
self._real_medium = None
959
self._ssh_params = ssh_params
755
self._connected = False
757
self._password = password
759
self._username = username
960
760
# for the benefit of progress making a short description of this
962
762
self._scheme = 'bzr+ssh'
964
764
# _DebugCounter so we have to store all the values used in our repr
965
765
# method before calling the super init.
966
766
SmartClientStreamMedium.__init__(self, base)
767
self._read_from = None
768
self._ssh_connection = None
967
769
self._vendor = vendor
968
self._ssh_connection = None
770
self._write_to = None
771
self._bzr_remote_path = bzr_remote_path
970
773
def __repr__(self):
971
if self._ssh_params.port is None:
774
if self._port is None:
974
maybe_port = ':%s' % self._ssh_params.port
975
if self._ssh_params.username is None:
978
maybe_user = '%s@' % self._ssh_params.username
979
return "%s(%s://%s%s%s/)" % (
777
maybe_port = ':%s' % self._port
778
return "%s(%s://%s@%s%s/)" % (
980
779
self.__class__.__name__,
983
self._ssh_params.host,
986
785
def _accept_bytes(self, bytes):
987
786
"""See SmartClientStreamMedium.accept_bytes."""
988
787
self._ensure_connection()
989
self._real_medium.accept_bytes(bytes)
788
self._write_to.write(bytes)
789
self._report_activity(len(bytes), 'write')
991
791
def disconnect(self):
992
792
"""See SmartClientMedium.disconnect()."""
993
if self._real_medium is not None:
994
self._real_medium.disconnect()
995
self._real_medium = None
996
if self._ssh_connection is not None:
997
self._ssh_connection.close()
998
self._ssh_connection = None
793
if not self._connected:
795
self._read_from.close()
796
self._write_to.close()
797
self._ssh_connection.close()
798
self._connected = False
1000
800
def _ensure_connection(self):
1001
801
"""Connect this medium if not already connected."""
1002
if self._real_medium is not None:
1004
804
if self._vendor is None:
1005
805
vendor = ssh._get_ssh_vendor()
1007
807
vendor = self._vendor
1008
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1009
self._ssh_params.password, self._ssh_params.host,
1010
self._ssh_params.port,
1011
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
808
self._ssh_connection = vendor.connect_ssh(self._username,
809
self._password, self._host, self._port,
810
command=[self._bzr_remote_path, 'serve', '--inet',
1012
811
'--directory=/', '--allow-writes'])
1013
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1014
if io_kind == 'socket':
1015
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1016
self.base, io_object)
1017
elif io_kind == 'pipes':
1018
read_from, write_to = io_object
1019
self._real_medium = SmartSimplePipesClientMedium(
1020
read_from, write_to, self.base)
1022
raise AssertionError(
1023
"Unexpected io_kind %r from %r"
1024
% (io_kind, self._ssh_connection))
1025
for hook in transport.Transport.hooks["post_connect"]:
812
self._read_from, self._write_to = \
813
self._ssh_connection.get_filelike_channels()
814
self._connected = True
1028
816
def _flush(self):
1029
817
"""See SmartClientStreamMedium._flush()."""
1030
self._real_medium._flush()
818
self._write_to.flush()
1032
820
def _read_bytes(self, count):
1033
821
"""See SmartClientStreamMedium.read_bytes."""
1034
if self._real_medium is None:
822
if not self._connected:
1035
823
raise errors.MediumNotConnected(self)
1036
return self._real_medium.read_bytes(count)
824
bytes_to_read = min(count, _MAX_READ_SIZE)
825
bytes = self._read_from.read(bytes_to_read)
826
self._report_activity(len(bytes), 'read')
1039
830
# Port 4155 is the default port for bzr://, registered with IANA.
1041
832
BZR_DEFAULT_PORT = 4155
1044
class SmartClientSocketMedium(SmartClientStreamMedium):
1045
"""A client medium using a socket.
1047
This class isn't usable directly. Use one of its subclasses instead.
1050
def __init__(self, base):
835
class SmartTCPClientMedium(SmartClientStreamMedium):
836
"""A client medium using TCP."""
838
def __init__(self, host, port, base):
839
"""Creates a client that will connect on the first use."""
1051
840
SmartClientStreamMedium.__init__(self, base)
841
self._connected = False
1052
844
self._socket = None
1053
self._connected = False
1055
846
def _accept_bytes(self, bytes):
1056
847
"""See SmartClientMedium.accept_bytes."""
1057
848
self._ensure_connection()
1058
849
osutils.send_all(self._socket, bytes, self._report_activity)
1060
def _ensure_connection(self):
1061
"""Connect this medium if not already connected."""
1062
raise NotImplementedError(self._ensure_connection)
1065
"""See SmartClientStreamMedium._flush().
1067
For sockets we do no flushing. For TCP sockets we may want to turn off
1068
TCP_NODELAY and add a means to do a flush, but that can be done in the
1072
def _read_bytes(self, count):
1073
"""See SmartClientMedium.read_bytes."""
1074
if not self._connected:
1075
raise errors.MediumNotConnected(self)
1076
return osutils.read_bytes_from_socket(
1077
self._socket, self._report_activity)
1079
851
def disconnect(self):
1080
852
"""See SmartClientMedium.disconnect()."""
1081
853
if not self._connected:
1106
868
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1107
869
socket.SOCK_STREAM, 0, 0)
1108
except socket.gaierror as xxx_todo_changeme:
1109
(err_num, err_msg) = xxx_todo_changeme.args
870
except socket.gaierror, (err_num, err_msg):
1110
871
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1111
872
(self._host, port, err_msg))
1112
873
# Initialize err in case there are no addresses returned:
1126
887
if self._socket is None:
1127
888
# socket errors either have a (string) or (errno, string) as their
1129
if isinstance(err.args, str):
890
if type(err.args) is str:
1130
891
err_msg = err.args
1132
893
err_msg = err.args[1]
1133
894
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1134
895
(self._host, port, err_msg))
1135
896
self._connected = True
1136
for hook in transport.Transport.hooks["post_connect"]:
1140
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1141
"""A client medium for an already connected socket.
1143
Note that this class will assume it "owns" the socket, so it will close it
1144
when its disconnect method is called.
1147
def __init__(self, base, sock):
1148
SmartClientSocketMedium.__init__(self, base)
1150
self._connected = True
1152
def _ensure_connection(self):
1153
# Already connected, by definition! So nothing to do.
899
"""See SmartClientStreamMedium._flush().
901
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
add a means to do a flush, but that can be done in the future.
905
def _read_bytes(self, count):
906
"""See SmartClientMedium.read_bytes."""
907
if not self._connected:
908
raise errors.MediumNotConnected(self)
909
return osutils.read_bytes_from_socket(
910
self._socket, self._report_activity)
1157
913
class SmartClientStreamMediumRequest(SmartClientMediumRequest):