22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
24
breezy/transport/smart/__init__.py.
27
from __future__ import absolute_import
35
from bzrlib.lazy_import import lazy_import
36
import thread as _thread
39
from ...lazy_import import lazy_import
36
40
lazy_import(globals(), """
50
from bzrlib.i18n import gettext
51
from bzrlib.smart import client, protocol, request, signals, vfs
52
from bzrlib.transport import ssh
52
from breezy.i18n import gettext
53
from breezy.bzr.smart import client, protocol, request, signals, vfs
54
from breezy.transport import ssh
54
from bzrlib import osutils
56
61
# Throughout this module buffer size parameters are either limited to be at
57
62
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
59
64
# from non-sockets as well.
60
65
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
68
class HpssVfsRequestNotAllowed(errors.BzrError):
70
_fmt = ("VFS requests over the smart server are not allowed. Encountered: "
71
"%(method)s, %(arguments)s.")
73
def __init__(self, method, arguments):
75
self.arguments = arguments
62
78
def _get_protocol_factory_for_bytes(bytes):
63
79
"""Determine the right protocol factory for 'bytes'.
100
116
:returns: a tuple of two strs: (line, excess)
104
120
while newline_pos == -1:
105
121
new_bytes = read_bytes_func(1)
106
122
bytes += new_bytes
108
124
# Ran out of bytes before receiving a complete line.
110
newline_pos = bytes.find('\n')
111
line = bytes[:newline_pos+1]
112
excess = bytes[newline_pos+1:]
126
newline_pos = bytes.find(b'\n')
127
line = bytes[:newline_pos + 1]
128
excess = bytes[newline_pos + 1:]
113
129
return line, excess
119
135
def __init__(self):
120
136
self._push_back_buffer = None
122
def _push_back(self, bytes):
138
def _push_back(self, data):
123
139
"""Return unused bytes to the medium, because they belong to the next
126
142
This sets the _push_back_buffer to the given bytes.
144
if not isinstance(data, bytes):
145
raise TypeError(data)
128
146
if self._push_back_buffer is not None:
129
147
raise AssertionError(
130
148
"_push_back called when self._push_back_buffer is %r"
131
149
% (self._push_back_buffer,))
134
self._push_back_buffer = bytes
152
self._push_back_buffer = data
136
154
def _get_push_back_buffer(self):
137
if self._push_back_buffer == '':
155
if self._push_back_buffer == b'':
138
156
raise AssertionError(
139
157
'%s._push_back_buffer should never be the empty string, '
140
158
'which can be confused with EOF' % (self,))
233
251
while not self.finished:
234
252
server_protocol = self._build_protocol()
235
253
self._serve_one_request(server_protocol)
236
except errors.ConnectionTimeout, e:
254
except errors.ConnectionTimeout as e:
237
255
trace.note('%s' % (e,))
238
256
trace.log_exception_quietly()
239
257
self._disconnect_client()
240
258
# We reported it, no reason to make a big fuss.
260
except Exception as e:
243
261
stderr.write("%s terminating on exception %s\n" % (self, e))
245
263
self._disconnect_client()
308
326
# select.error doesn't have 'errno', it just has args[0]
310
328
if err in _bad_file_descriptor:
311
return # Not a socket indicates read() will fail
329
return # Not a socket indicates read() will fail
312
330
elif err == errno.EINTR:
313
331
# Interrupted, keep looping.
335
return # Socket may already be closed
318
338
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
377
397
# than MAX_SOCKET_CHUNK ready, the socket will just return a
378
398
# short read immediately rather than block.
379
399
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
381
401
self.finished = True
383
403
protocol.accept_bytes(bytes)
411
431
self.finished = True
413
433
def _write_out(self, bytes):
414
tstart = osutils.timer_func()
434
tstart = osutils.perf_counter()
415
435
osutils.send_all(self.socket, bytes, self._report_activity)
416
436
if 'hpss' in debug.debug_flags:
417
thread_id = thread.get_ident()
437
thread_id = _thread.get_ident()
418
438
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
419
439
% ('wrote', thread_id, len(bytes),
420
osutils.timer_func() - tstart))
440
osutils.perf_counter() - tstart))
423
443
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
430
450
:param backing_transport: Transport for the directory served.
432
452
SmartServerStreamMedium.__init__(self, backing_transport,
434
454
if sys.platform == 'win32':
435
455
# force binary mode for files
486
506
data is available.
488
508
if (getattr(self._in, 'fileno', None) is None
489
or sys.platform == 'win32'):
509
or sys.platform == 'win32'):
490
510
# You can't select() file descriptors on Windows.
492
return self._wait_on_descriptor(self._in, timeout_seconds)
513
return self._wait_on_descriptor(self._in, timeout_seconds)
514
except io.UnsupportedOperation:
494
517
def _read_bytes(self, desired_count):
495
518
return self._in.read(desired_count)
625
648
def read_line(self):
626
649
line = self._read_line()
627
if not line.endswith('\n'):
650
if not line.endswith(b'\n'):
628
651
# end of file encountered reading from server
629
652
raise errors.ConnectionReset(
630
653
"Unexpected end of message. Please check connectivity "
656
679
# A method we don't know about doesn't count as a VFS method.
658
681
if issubclass(request_method, vfs.VfsRequest):
659
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
682
raise HpssVfsRequestNotAllowed(params.method, params.args)
662
685
class _DebugCounter(object):
663
686
"""An object that counts the HPSS calls made to each client medium.
665
688
When a medium is garbage-collected, or failing that when
666
bzrlib.global_state exits, the total number of calls made on that medium
689
breezy.global_state exits, the total number of calls made on that medium
667
690
are reported via trace.note.
671
694
self.counts = weakref.WeakKeyDictionary()
672
695
client._SmartClient.hooks.install_named_hook(
673
696
'call', self.increment_call_count, 'hpss call counter')
674
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
697
breezy.get_global_state().exit_stack.callback(self.flush_all)
676
699
def track(self, medium):
677
700
"""Start tracking calls made to a medium.
775
799
:seealso: _is_remote_before
777
801
if (self._remote_version_is_before is not None and
778
version_tuple > self._remote_version_is_before):
802
version_tuple > self._remote_version_is_before):
779
803
# We have been told that the remote side is older than some version
780
804
# which is newer than a previously supplied older-than version.
781
805
# This indicates that some smart verb call is not guarded
782
806
# appropriately (it should simply not have been tried).
784
808
"_remember_remote_is_before(%r) called, but "
785
"_remember_remote_is_before(%r) was called previously."
786
, version_tuple, self._remote_version_is_before)
809
"_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
787
810
if 'hpss' in debug.debug_flags:
788
811
ui.ui_factory.show_warning(
789
812
"_remember_remote_is_before(%r) called, but "
801
824
medium_request = self.get_request()
802
825
# Send a 'hello' request in protocol version one, for maximum
803
826
# backwards compatibility.
804
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
827
client_protocol = protocol.SmartClientRequestProtocolOne(
805
829
client_protocol.query_version()
806
830
self._done_hello = True
807
except errors.SmartProtocolError, e:
831
except errors.SmartProtocolError as e:
808
832
# Cache the error, just like we would cache a successful
810
834
self._protocol_version_error = e
903
927
self._readable_pipe = readable_pipe
904
928
self._writeable_pipe = writeable_pipe
906
def _accept_bytes(self, bytes):
930
def _accept_bytes(self, data):
907
931
"""See SmartClientStreamMedium.accept_bytes."""
909
self._writeable_pipe.write(bytes)
933
self._writeable_pipe.write(data)
911
935
if e.errno in (errno.EINVAL, errno.EPIPE):
912
936
raise errors.ConnectionReset(
913
937
"Error trying to write to subprocess", e)
915
self._report_activity(len(bytes), 'write')
939
self._report_activity(len(data), 'write')
917
941
def _flush(self):
918
942
"""See SmartClientStreamMedium._flush()."""
924
948
def _read_bytes(self, count):
925
949
"""See SmartClientStreamMedium._read_bytes."""
926
950
bytes_to_read = min(count, _MAX_READ_SIZE)
927
bytes = self._readable_pipe.read(bytes_to_read)
928
self._report_activity(len(bytes), 'read')
951
data = self._readable_pipe.read(bytes_to_read)
952
self._report_activity(len(data), 'read')
932
956
class SSHParams(object):
933
957
"""A set of parameters for starting a remote bzr via SSH."""
935
959
def __init__(self, host, port=None, username=None, password=None,
936
bzr_remote_path='bzr'):
960
bzr_remote_path='bzr'):
939
963
self.username = username
954
978
:param ssh_params: A SSHParams instance.
955
979
:param vendor: An optional override for the ssh vendor to use. See
956
bzrlib.transport.ssh for details on ssh vendors.
980
breezy.transport.ssh for details on ssh vendors.
958
982
self._real_medium = None
959
983
self._ssh_params = ssh_params
1007
1031
vendor = self._vendor
1008
1032
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1009
self._ssh_params.password, self._ssh_params.host,
1010
self._ssh_params.port,
1011
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1012
'--directory=/', '--allow-writes'])
1033
self._ssh_params.password, self._ssh_params.host,
1034
self._ssh_params.port,
1035
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1036
'--directory=/', '--allow-writes'])
1013
1037
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1014
1038
if io_kind == 'socket':
1015
1039
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1104
1128
port = int(self._port)
1106
1130
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1107
socket.SOCK_STREAM, 0, 0)
1108
except socket.gaierror, (err_num, err_msg):
1131
socket.SOCK_STREAM, 0, 0)
1132
except socket.gaierror as xxx_todo_changeme:
1133
(err_num, err_msg) = xxx_todo_changeme.args
1109
1134
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1110
(self._host, port, err_msg))
1135
(self._host, port, err_msg))
1111
1136
# Initialize err in case there are no addresses returned:
1112
err = socket.error("no address found for %s" % self._host)
1137
last_err = socket.error("no address found for %s" % self._host)
1113
1138
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1115
1140
self._socket = socket.socket(family, socktype, proto)
1116
1141
self._socket.setsockopt(socket.IPPROTO_TCP,
1117
1142
socket.TCP_NODELAY, 1)
1118
1143
self._socket.connect(sockaddr)
1119
except socket.error, err:
1144
except socket.error as err:
1120
1145
if self._socket is not None:
1121
1146
self._socket.close()
1122
1147
self._socket = None
1125
1151
if self._socket is None:
1126
1152
# socket errors either have a (string) or (errno, string) as their
1128
if type(err.args) is str:
1154
if isinstance(last_err.args, str):
1155
err_msg = last_err.args
1131
err_msg = err.args[1]
1157
err_msg = last_err.args[1]
1132
1158
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1133
(self._host, port, err_msg))
1159
(self._host, port, err_msg))
1134
1160
self._connected = True
1135
1161
for hook in transport.Transport.hooks["post_connect"]:
1139
1165
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1140
1166
"""A client medium for an already connected socket.
1142
1168
Note that this class will assume it "owns" the socket, so it will close it
1143
1169
when its disconnect method is called.