22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
breezy/transport/smart/__init__.py.
24
brzlib/transport/smart/__init__.py.
27
from __future__ import absolute_import
36
import thread as _thread
39
from ...lazy_import import lazy_import
35
from brzlib.lazy_import import lazy_import
40
36
lazy_import(globals(), """
52
from breezy.i18n import gettext
53
from breezy.bzr.smart import client, protocol, request, signals, vfs
54
from breezy.transport import ssh
50
from brzlib.i18n import gettext
51
from brzlib.smart import client, protocol, request, signals, vfs
52
from brzlib.transport import ssh
54
from brzlib import osutils
61
56
# Throughout this module buffer size parameters are either limited to be at
62
57
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
64
59
# from non-sockets as well.
65
60
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
68
class HpssVfsRequestNotAllowed(errors.BzrError):
70
_fmt = ("VFS requests over the smart server are not allowed. Encountered: "
71
"%(method)s, %(arguments)s.")
73
def __init__(self, method, arguments):
75
self.arguments = arguments
78
62
def _get_protocol_factory_for_bytes(bytes):
79
63
"""Determine the right protocol factory for 'bytes'.
116
100
:returns: a tuple of two strs: (line, excess)
120
104
while newline_pos == -1:
121
105
new_bytes = read_bytes_func(1)
122
106
bytes += new_bytes
124
108
# Ran out of bytes before receiving a complete line.
126
newline_pos = bytes.find(b'\n')
127
line = bytes[:newline_pos + 1]
128
excess = bytes[newline_pos + 1:]
110
newline_pos = bytes.find('\n')
111
line = bytes[:newline_pos+1]
112
excess = bytes[newline_pos+1:]
129
113
return line, excess
135
119
def __init__(self):
136
120
self._push_back_buffer = None
138
def _push_back(self, data):
122
def _push_back(self, bytes):
139
123
"""Return unused bytes to the medium, because they belong to the next
142
126
This sets the _push_back_buffer to the given bytes.
144
if not isinstance(data, bytes):
145
raise TypeError(data)
146
128
if self._push_back_buffer is not None:
147
129
raise AssertionError(
148
130
"_push_back called when self._push_back_buffer is %r"
149
131
% (self._push_back_buffer,))
152
self._push_back_buffer = data
134
self._push_back_buffer = bytes
154
136
def _get_push_back_buffer(self):
155
if self._push_back_buffer == b'':
137
if self._push_back_buffer == '':
156
138
raise AssertionError(
157
139
'%s._push_back_buffer should never be the empty string, '
158
140
'which can be confused with EOF' % (self,))
251
233
while not self.finished:
252
234
server_protocol = self._build_protocol()
253
235
self._serve_one_request(server_protocol)
254
except errors.ConnectionTimeout as e:
236
except errors.ConnectionTimeout, e:
255
237
trace.note('%s' % (e,))
256
238
trace.log_exception_quietly()
257
239
self._disconnect_client()
258
240
# We reported it, no reason to make a big fuss.
260
except Exception as e:
261
243
stderr.write("%s terminating on exception %s\n" % (self, e))
263
245
self._disconnect_client()
326
308
# select.error doesn't have 'errno', it just has args[0]
328
310
if err in _bad_file_descriptor:
329
return # Not a socket indicates read() will fail
311
return # Not a socket indicates read() will fail
330
312
elif err == errno.EINTR:
331
313
# Interrupted, keep looping.
335
return # Socket may already be closed
338
318
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
397
377
# than MAX_SOCKET_CHUNK ready, the socket will just return a
398
378
# short read immediately rather than block.
399
379
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
401
381
self.finished = True
403
383
protocol.accept_bytes(bytes)
431
411
self.finished = True
433
413
def _write_out(self, bytes):
434
tstart = osutils.perf_counter()
414
tstart = osutils.timer_func()
435
415
osutils.send_all(self.socket, bytes, self._report_activity)
436
416
if 'hpss' in debug.debug_flags:
437
thread_id = _thread.get_ident()
417
thread_id = thread.get_ident()
438
418
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
439
419
% ('wrote', thread_id, len(bytes),
440
osutils.perf_counter() - tstart))
420
osutils.timer_func() - tstart))
443
423
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
450
430
:param backing_transport: Transport for the directory served.
452
432
SmartServerStreamMedium.__init__(self, backing_transport,
454
434
if sys.platform == 'win32':
455
435
# force binary mode for files
506
486
data is available.
508
488
if (getattr(self._in, 'fileno', None) is None
509
or sys.platform == 'win32'):
489
or sys.platform == 'win32'):
510
490
# You can't select() file descriptors on Windows.
513
return self._wait_on_descriptor(self._in, timeout_seconds)
514
except io.UnsupportedOperation:
492
return self._wait_on_descriptor(self._in, timeout_seconds)
517
494
def _read_bytes(self, desired_count):
518
495
return self._in.read(desired_count)
648
625
def read_line(self):
649
626
line = self._read_line()
650
if not line.endswith(b'\n'):
627
if not line.endswith('\n'):
651
628
# end of file encountered reading from server
652
629
raise errors.ConnectionReset(
653
630
"Unexpected end of message. Please check connectivity "
679
656
# A method we don't know about doesn't count as a VFS method.
681
658
if issubclass(request_method, vfs.VfsRequest):
682
raise HpssVfsRequestNotAllowed(params.method, params.args)
659
raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
685
662
class _DebugCounter(object):
686
663
"""An object that counts the HPSS calls made to each client medium.
688
665
When a medium is garbage-collected, or failing that when
689
breezy.global_state exits, the total number of calls made on that medium
666
brzlib.global_state exits, the total number of calls made on that medium
690
667
are reported via trace.note.
694
671
self.counts = weakref.WeakKeyDictionary()
695
672
client._SmartClient.hooks.install_named_hook(
696
673
'call', self.increment_call_count, 'hpss call counter')
697
breezy.get_global_state().exit_stack.callback(self.flush_all)
674
brzlib.global_state.cleanups.add_cleanup(self.flush_all)
699
676
def track(self, medium):
700
677
"""Start tracking calls made to a medium.
799
775
:seealso: _is_remote_before
801
777
if (self._remote_version_is_before is not None and
802
version_tuple > self._remote_version_is_before):
778
version_tuple > self._remote_version_is_before):
803
779
# We have been told that the remote side is older than some version
804
780
# which is newer than a previously supplied older-than version.
805
781
# This indicates that some smart verb call is not guarded
806
782
# appropriately (it should simply not have been tried).
808
784
"_remember_remote_is_before(%r) called, but "
809
"_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
785
"_remember_remote_is_before(%r) was called previously."
786
, version_tuple, self._remote_version_is_before)
810
787
if 'hpss' in debug.debug_flags:
811
788
ui.ui_factory.show_warning(
812
789
"_remember_remote_is_before(%r) called, but "
824
801
medium_request = self.get_request()
825
802
# Send a 'hello' request in protocol version one, for maximum
826
803
# backwards compatibility.
827
client_protocol = protocol.SmartClientRequestProtocolOne(
804
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
829
805
client_protocol.query_version()
830
806
self._done_hello = True
831
except errors.SmartProtocolError as e:
807
except errors.SmartProtocolError, e:
832
808
# Cache the error, just like we would cache a successful
834
810
self._protocol_version_error = e
927
903
self._readable_pipe = readable_pipe
928
904
self._writeable_pipe = writeable_pipe
930
def _accept_bytes(self, data):
906
def _accept_bytes(self, bytes):
931
907
"""See SmartClientStreamMedium.accept_bytes."""
933
self._writeable_pipe.write(data)
909
self._writeable_pipe.write(bytes)
935
911
if e.errno in (errno.EINVAL, errno.EPIPE):
936
912
raise errors.ConnectionReset(
937
913
"Error trying to write to subprocess", e)
939
self._report_activity(len(data), 'write')
915
self._report_activity(len(bytes), 'write')
941
917
def _flush(self):
942
918
"""See SmartClientStreamMedium._flush()."""
948
924
def _read_bytes(self, count):
949
925
"""See SmartClientStreamMedium._read_bytes."""
950
926
bytes_to_read = min(count, _MAX_READ_SIZE)
951
data = self._readable_pipe.read(bytes_to_read)
952
self._report_activity(len(data), 'read')
927
bytes = self._readable_pipe.read(bytes_to_read)
928
self._report_activity(len(bytes), 'read')
956
932
class SSHParams(object):
957
933
"""A set of parameters for starting a remote bzr via SSH."""
959
935
def __init__(self, host, port=None, username=None, password=None,
960
bzr_remote_path='bzr'):
936
bzr_remote_path='bzr'):
963
939
self.username = username
978
954
:param ssh_params: A SSHParams instance.
979
955
:param vendor: An optional override for the ssh vendor to use. See
980
breezy.transport.ssh for details on ssh vendors.
956
brzlib.transport.ssh for details on ssh vendors.
982
958
self._real_medium = None
983
959
self._ssh_params = ssh_params
1031
1007
vendor = self._vendor
1032
1008
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1033
self._ssh_params.password, self._ssh_params.host,
1034
self._ssh_params.port,
1035
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1036
'--directory=/', '--allow-writes'])
1009
self._ssh_params.password, self._ssh_params.host,
1010
self._ssh_params.port,
1011
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1012
'--directory=/', '--allow-writes'])
1037
1013
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1038
1014
if io_kind == 'socket':
1039
1015
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1128
1104
port = int(self._port)
1130
1106
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1131
socket.SOCK_STREAM, 0, 0)
1132
except socket.gaierror as xxx_todo_changeme:
1133
(err_num, err_msg) = xxx_todo_changeme.args
1107
socket.SOCK_STREAM, 0, 0)
1108
except socket.gaierror, (err_num, err_msg):
1134
1109
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1135
(self._host, port, err_msg))
1110
(self._host, port, err_msg))
1136
1111
# Initialize err in case there are no addresses returned:
1137
last_err = socket.error("no address found for %s" % self._host)
1112
err = socket.error("no address found for %s" % self._host)
1138
1113
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1140
1115
self._socket = socket.socket(family, socktype, proto)
1141
1116
self._socket.setsockopt(socket.IPPROTO_TCP,
1142
1117
socket.TCP_NODELAY, 1)
1143
1118
self._socket.connect(sockaddr)
1144
except socket.error as err:
1119
except socket.error, err:
1145
1120
if self._socket is not None:
1146
1121
self._socket.close()
1147
1122
self._socket = None
1151
1125
if self._socket is None:
1152
1126
# socket errors either have a (string) or (errno, string) as their
1154
if isinstance(last_err.args, str):
1155
err_msg = last_err.args
1128
if type(err.args) is str:
1157
err_msg = last_err.args[1]
1131
err_msg = err.args[1]
1158
1132
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1159
(self._host, port, err_msg))
1133
(self._host, port, err_msg))
1160
1134
self._connected = True
1161
1135
for hook in transport.Transport.hooks["post_connect"]:
1165
1139
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1166
1140
"""A client medium for an already connected socket.
1168
1142
Note that this class will assume it "owns" the socket, so it will close it
1169
1143
when its disconnect method is called.