22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
breezy/transport/smart/__init__.py.
24
bzrlib/transport/smart/__init__.py.
36
import thread as _thread
39
from ...lazy_import import lazy_import
33
from bzrlib.lazy_import import lazy_import
40
34
lazy_import(globals(), """
52
from breezy.i18n import gettext
53
from breezy.bzr.smart import client, protocol, request, signals, vfs
54
from breezy.transport import ssh
45
from bzrlib.smart import client, protocol, request, vfs
46
from bzrlib.transport import ssh
61
# Throughout this module buffer size parameters are either limited to be at
62
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
63
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
64
# from non-sockets as well.
65
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
68
class HpssVfsRequestNotAllowed(errors.BzrError):
70
_fmt = ("VFS requests over the smart server are not allowed. Encountered: "
71
"%(method)s, %(arguments)s.")
73
def __init__(self, method, arguments):
75
self.arguments = arguments
48
#usually already imported, and getting IllegalScoperReplacer on it here.
49
from bzrlib import osutils
51
# We must not read any more than 64k at a time so we don't risk "no buffer
52
# space available" errors on some platforms. Windows in particular is likely
53
# to give error 10053 or 10055 if we read more than 64k from a socket.
54
_MAX_READ_SIZE = 64 * 1024
78
57
def _get_protocol_factory_for_bytes(bytes):
251
214
while not self.finished:
252
215
server_protocol = self._build_protocol()
253
216
self._serve_one_request(server_protocol)
254
except errors.ConnectionTimeout as e:
255
trace.note('%s' % (e,))
256
trace.log_exception_quietly()
257
self._disconnect_client()
258
# We reported it, no reason to make a big fuss.
260
except Exception as e:
261
218
stderr.write("%s terminating on exception %s\n" % (self, e))
263
self._disconnect_client()
265
def _stop_gracefully(self):
266
"""When we finish this message, stop looking for more."""
267
trace.mutter('Stopping %s' % (self,))
270
def _disconnect_client(self):
271
"""Close the current connection. We stopped due to a timeout/etc."""
272
# The default implementation is a no-op, because that is all we used to
273
# do when disconnecting from a client. I suppose we never had the
274
# *server* initiate a disconnect, before
276
def _wait_for_bytes_with_timeout(self, timeout_seconds):
277
"""Wait for more bytes to be read, but timeout if none available.
279
This allows us to detect idle connections, and stop trying to read from
280
them, without setting the socket itself to non-blocking. This also
281
allows us to specify when we watch for idle timeouts.
283
:return: Did we timeout? (True if we timed out, False if there is data
286
raise NotImplementedError(self._wait_for_bytes_with_timeout)
288
221
def _build_protocol(self):
289
222
"""Identifies the version of the incoming request, and returns an
305
234
protocol.accept_bytes(unused_bytes)
308
def _wait_on_descriptor(self, fd, timeout_seconds):
309
"""select() on a file descriptor, waiting for nonblocking read()
311
This will raise a ConnectionTimeout exception if we do not get a
312
readable handle before timeout_seconds.
315
t_end = self._timer() + timeout_seconds
316
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
318
while not rs and not xs and self._timer() < t_end:
322
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
323
except (select.error, socket.error) as e:
324
err = getattr(e, 'errno', None)
325
if err is None and getattr(e, 'args', None) is not None:
326
# select.error doesn't have 'errno', it just has args[0]
328
if err in _bad_file_descriptor:
329
return # Not a socket indicates read() will fail
330
elif err == errno.EINTR:
331
# Interrupted, keep looping.
335
return # Socket may already be closed
338
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
339
% (timeout_seconds,))
341
237
def _serve_one_request(self, protocol):
342
238
"""Read one request from input, process, send back a response.
344
240
:param protocol: a SmartServerRequestProtocol.
349
243
self._serve_one_request_unguarded(protocol)
350
244
except KeyboardInterrupt:
352
except Exception as e:
353
247
self.terminate_due_to_error()
355
249
def terminate_due_to_error(self):
367
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
369
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
373
266
:param sock: the socket the server will read from. It will be put
374
267
into blocking mode.
376
269
SmartServerStreamMedium.__init__(
377
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
379
271
sock.setblocking(True)
380
272
self.socket = sock
381
# Get the getpeername now, as we might be closed later when we care.
383
self._client_info = sock.getpeername()
385
self._client_info = '<unknown>'
388
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
391
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
394
274
def _serve_one_request_unguarded(self, protocol):
395
275
while protocol.next_read_size():
396
276
# We can safely try to read large chunks. If there is less data
397
# than MAX_SOCKET_CHUNK ready, the socket will just return a
398
# short read immediately rather than block.
399
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
277
# than _MAX_READ_SIZE ready, the socket wil just return a short
278
# read immediately rather than block.
279
bytes = self.read_bytes(_MAX_READ_SIZE)
401
281
self.finished = True
403
283
protocol.accept_bytes(bytes)
405
285
self._push_back(protocol.unused_data)
407
def _disconnect_client(self):
408
"""Close the current connection. We stopped due to a timeout/etc."""
411
def _wait_for_bytes_with_timeout(self, timeout_seconds):
412
"""Wait for more bytes to be read, but timeout if none available.
414
This allows us to detect idle connections, and stop trying to read from
415
them, without setting the socket itself to non-blocking. This also
416
allows us to specify when we watch for idle timeouts.
418
:return: None, this will raise ConnectionTimeout if we time out before
421
return self._wait_on_descriptor(self.socket, timeout_seconds)
423
287
def _read_bytes(self, desired_count):
424
return osutils.read_bytes_from_socket(
425
self.socket, self._report_activity)
288
return _read_bytes_from_socket(
289
self.socket.recv, desired_count, self._report_activity)
427
291
def terminate_due_to_error(self):
428
292
# TODO: This should log to a server log file, but no such thing
431
295
self.finished = True
433
297
def _write_out(self, bytes):
434
tstart = osutils.perf_counter()
435
298
osutils.send_all(self.socket, bytes, self._report_activity)
436
if 'hpss' in debug.debug_flags:
437
thread_id = _thread.get_ident()
438
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
439
% ('wrote', thread_id, len(bytes),
440
osutils.perf_counter() - tstart))
443
301
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
445
def __init__(self, in_file, out_file, backing_transport, timeout=None):
303
def __init__(self, in_file, out_file, backing_transport):
446
304
"""Construct new server.
448
306
:param in_file: Python file from which requests can be read.
449
307
:param out_file: Python file to write responses.
450
308
:param backing_transport: Transport for the directory served.
452
SmartServerStreamMedium.__init__(self, backing_transport,
310
SmartServerStreamMedium.__init__(self, backing_transport)
454
311
if sys.platform == 'win32':
455
312
# force binary mode for files
663
485
return self._medium._get_line()
666
class _VfsRefuser(object):
667
"""An object that refuses all VFS requests.
672
client._SmartClient.hooks.install_named_hook(
673
'call', self.check_vfs, 'vfs refuser')
675
def check_vfs(self, params):
677
request_method = request.request_handlers.get(params.method)
679
# A method we don't know about doesn't count as a VFS method.
681
if issubclass(request_method, vfs.VfsRequest):
682
raise HpssVfsRequestNotAllowed(params.method, params.args)
685
488
class _DebugCounter(object):
686
489
"""An object that counts the HPSS calls made to each client medium.
688
When a medium is garbage-collected, or failing that when
689
breezy.global_state exits, the total number of calls made on that medium
690
are reported via trace.note.
491
When a medium is garbage-collected, or failing that when atexit functions
492
are run, the total number of calls made on that medium are reported via
693
496
def __init__(self):
694
497
self.counts = weakref.WeakKeyDictionary()
695
498
client._SmartClient.hooks.install_named_hook(
696
499
'call', self.increment_call_count, 'hpss call counter')
697
breezy.get_global_state().exit_stack.callback(self.flush_all)
500
atexit.register(self.flush_all)
699
502
def track(self, medium):
700
503
"""Start tracking calls made to a medium.
799
596
:seealso: _is_remote_before
801
598
if (self._remote_version_is_before is not None and
802
version_tuple > self._remote_version_is_before):
599
version_tuple > self._remote_version_is_before):
803
600
# We have been told that the remote side is older than some version
804
601
# which is newer than a previously supplied older-than version.
805
602
# This indicates that some smart verb call is not guarded
806
603
# appropriately (it should simply not have been tried).
604
raise AssertionError(
808
605
"_remember_remote_is_before(%r) called, but "
809
"_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
810
if 'hpss' in debug.debug_flags:
811
ui.ui_factory.show_warning(
812
"_remember_remote_is_before(%r) called, but "
813
"_remember_remote_is_before(%r) was called previously."
814
% (version_tuple, self._remote_version_is_before))
606
"_remember_remote_is_before(%r) was called previously."
607
% (version_tuple, self._remote_version_is_before))
816
608
self._remote_version_is_before = version_tuple
818
610
def protocol_version(self):
927
710
self._readable_pipe = readable_pipe
928
711
self._writeable_pipe = writeable_pipe
930
def _accept_bytes(self, data):
713
def _accept_bytes(self, bytes):
931
714
"""See SmartClientStreamMedium.accept_bytes."""
933
self._writeable_pipe.write(data)
935
if e.errno in (errno.EINVAL, errno.EPIPE):
936
raise errors.ConnectionReset(
937
"Error trying to write to subprocess", e)
939
self._report_activity(len(data), 'write')
715
self._writeable_pipe.write(bytes)
716
self._report_activity(len(bytes), 'write')
941
718
def _flush(self):
942
719
"""See SmartClientStreamMedium._flush()."""
943
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
944
# However, testing shows that even when the child process is
945
# gone, this doesn't error.
946
720
self._writeable_pipe.flush()
948
722
def _read_bytes(self, count):
949
723
"""See SmartClientStreamMedium._read_bytes."""
950
bytes_to_read = min(count, _MAX_READ_SIZE)
951
data = self._readable_pipe.read(bytes_to_read)
952
self._report_activity(len(data), 'read')
956
class SSHParams(object):
957
"""A set of parameters for starting a remote bzr via SSH."""
724
bytes = self._readable_pipe.read(count)
725
self._report_activity(len(bytes), 'read')
729
class SmartSSHClientMedium(SmartClientStreamMedium):
730
"""A client medium using SSH."""
959
732
def __init__(self, host, port=None, username=None, password=None,
960
bzr_remote_path='bzr'):
963
self.username = username
964
self.password = password
965
self.bzr_remote_path = bzr_remote_path
968
class SmartSSHClientMedium(SmartClientStreamMedium):
969
"""A client medium using SSH.
971
It delegates IO to a SmartSimplePipesClientMedium or
972
SmartClientAlreadyConnectedSocketMedium (depending on platform).
975
def __init__(self, base, ssh_params, vendor=None):
733
base=None, vendor=None, bzr_remote_path=None):
976
734
"""Creates a client that will connect on the first use.
978
:param ssh_params: A SSHParams instance.
979
736
:param vendor: An optional override for the ssh vendor to use. See
980
breezy.transport.ssh for details on ssh vendors.
737
bzrlib.transport.ssh for details on ssh vendors.
982
self._real_medium = None
983
self._ssh_params = ssh_params
984
# for the benefit of progress making a short description of this
986
self._scheme = 'bzr+ssh'
739
self._connected = False
741
self._password = password
743
self._username = username
987
744
# SmartClientStreamMedium stores the repr of this object in its
988
745
# _DebugCounter so we have to store all the values used in our repr
989
746
# method before calling the super init.
990
747
SmartClientStreamMedium.__init__(self, base)
748
self._read_from = None
749
self._ssh_connection = None
991
750
self._vendor = vendor
992
self._ssh_connection = None
751
self._write_to = None
752
self._bzr_remote_path = bzr_remote_path
753
# for the benefit of progress making a short description of this
755
self._scheme = 'bzr+ssh'
994
757
def __repr__(self):
995
if self._ssh_params.port is None:
998
maybe_port = ':%s' % self._ssh_params.port
999
if self._ssh_params.username is None:
1002
maybe_user = '%s@' % self._ssh_params.username
1003
return "%s(%s://%s%s%s/)" % (
758
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
1004
759
self.__class__.__name__,
1007
self._ssh_params.host,
1010
765
def _accept_bytes(self, bytes):
1011
766
"""See SmartClientStreamMedium.accept_bytes."""
1012
767
self._ensure_connection()
1013
self._real_medium.accept_bytes(bytes)
768
self._write_to.write(bytes)
769
self._report_activity(len(bytes), 'write')
1015
771
def disconnect(self):
1016
772
"""See SmartClientMedium.disconnect()."""
1017
if self._real_medium is not None:
1018
self._real_medium.disconnect()
1019
self._real_medium = None
1020
if self._ssh_connection is not None:
1021
self._ssh_connection.close()
1022
self._ssh_connection = None
773
if not self._connected:
775
self._read_from.close()
776
self._write_to.close()
777
self._ssh_connection.close()
778
self._connected = False
1024
780
def _ensure_connection(self):
1025
781
"""Connect this medium if not already connected."""
1026
if self._real_medium is not None:
1028
784
if self._vendor is None:
1029
785
vendor = ssh._get_ssh_vendor()
1031
787
vendor = self._vendor
1032
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1033
self._ssh_params.password, self._ssh_params.host,
1034
self._ssh_params.port,
1035
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1036
'--directory=/', '--allow-writes'])
1037
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1038
if io_kind == 'socket':
1039
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1040
self.base, io_object)
1041
elif io_kind == 'pipes':
1042
read_from, write_to = io_object
1043
self._real_medium = SmartSimplePipesClientMedium(
1044
read_from, write_to, self.base)
1046
raise AssertionError(
1047
"Unexpected io_kind %r from %r"
1048
% (io_kind, self._ssh_connection))
1049
for hook in transport.Transport.hooks["post_connect"]:
788
self._ssh_connection = vendor.connect_ssh(self._username,
789
self._password, self._host, self._port,
790
command=[self._bzr_remote_path, 'serve', '--inet',
791
'--directory=/', '--allow-writes'])
792
self._read_from, self._write_to = \
793
self._ssh_connection.get_filelike_channels()
794
self._connected = True
1052
796
def _flush(self):
1053
797
"""See SmartClientStreamMedium._flush()."""
1054
self._real_medium._flush()
798
self._write_to.flush()
1056
800
def _read_bytes(self, count):
1057
801
"""See SmartClientStreamMedium.read_bytes."""
1058
if self._real_medium is None:
802
if not self._connected:
1059
803
raise errors.MediumNotConnected(self)
1060
return self._real_medium.read_bytes(count)
804
bytes_to_read = min(count, _MAX_READ_SIZE)
805
bytes = self._read_from.read(bytes_to_read)
806
self._report_activity(len(bytes), 'read')
1063
810
# Port 4155 is the default port for bzr://, registered with IANA.
1065
812
BZR_DEFAULT_PORT = 4155
1068
class SmartClientSocketMedium(SmartClientStreamMedium):
1069
"""A client medium using a socket.
1071
This class isn't usable directly. Use one of its subclasses instead.
1074
def __init__(self, base):
815
class SmartTCPClientMedium(SmartClientStreamMedium):
816
"""A client medium using TCP."""
818
def __init__(self, host, port, base):
819
"""Creates a client that will connect on the first use."""
1075
820
SmartClientStreamMedium.__init__(self, base)
821
self._connected = False
1076
824
self._socket = None
1077
self._connected = False
1079
826
def _accept_bytes(self, bytes):
1080
827
"""See SmartClientMedium.accept_bytes."""
1081
828
self._ensure_connection()
1082
829
osutils.send_all(self._socket, bytes, self._report_activity)
1084
def _ensure_connection(self):
1085
"""Connect this medium if not already connected."""
1086
raise NotImplementedError(self._ensure_connection)
1089
"""See SmartClientStreamMedium._flush().
1091
For sockets we do no flushing. For TCP sockets we may want to turn off
1092
TCP_NODELAY and add a means to do a flush, but that can be done in the
1096
def _read_bytes(self, count):
1097
"""See SmartClientMedium.read_bytes."""
1098
if not self._connected:
1099
raise errors.MediumNotConnected(self)
1100
return osutils.read_bytes_from_socket(
1101
self._socket, self._report_activity)
1103
831
def disconnect(self):
1104
832
"""See SmartClientMedium.disconnect()."""
1105
833
if not self._connected:
1128
846
port = int(self._port)
1130
848
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1131
socket.SOCK_STREAM, 0, 0)
1132
except socket.gaierror as xxx_todo_changeme:
1133
(err_num, err_msg) = xxx_todo_changeme.args
849
socket.SOCK_STREAM, 0, 0)
850
except socket.gaierror, (err_num, err_msg):
1134
851
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1135
(self._host, port, err_msg))
852
(self._host, port, err_msg))
1136
853
# Initialize err in case there are no addresses returned:
1137
last_err = socket.error("no address found for %s" % self._host)
854
err = socket.error("no address found for %s" % self._host)
1138
855
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1140
857
self._socket = socket.socket(family, socktype, proto)
1141
858
self._socket.setsockopt(socket.IPPROTO_TCP,
1142
859
socket.TCP_NODELAY, 1)
1143
860
self._socket.connect(sockaddr)
1144
except socket.error as err:
861
except socket.error, err:
1145
862
if self._socket is not None:
1146
863
self._socket.close()
1147
864
self._socket = None
1151
867
if self._socket is None:
1152
868
# socket errors either have a (string) or (errno, string) as their
1154
if isinstance(last_err.args, str):
1155
err_msg = last_err.args
870
if type(err.args) is str:
1157
err_msg = last_err.args[1]
873
err_msg = err.args[1]
1158
874
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1159
(self._host, port, err_msg))
1160
self._connected = True
1161
for hook in transport.Transport.hooks["post_connect"]:
1165
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1166
"""A client medium for an already connected socket.
1168
Note that this class will assume it "owns" the socket, so it will close it
1169
when its disconnect method is called.
1172
def __init__(self, base, sock):
1173
SmartClientSocketMedium.__init__(self, base)
1175
self._connected = True
1177
def _ensure_connection(self):
1178
# Already connected, by definition! So nothing to do.
875
(self._host, port, err_msg))
876
self._connected = True
879
"""See SmartClientStreamMedium._flush().
881
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
882
add a means to do a flush, but that can be done in the future.
885
def _read_bytes(self, count):
886
"""See SmartClientMedium.read_bytes."""
887
if not self._connected:
888
raise errors.MediumNotConnected(self)
889
return _read_bytes_from_socket(
890
self._socket.recv, count, self._report_activity)
1182
893
class SmartClientStreamMediumRequest(SmartClientMediumRequest):