22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
breezy/transport/smart/__init__.py.
24
bzrlib/transport/smart/__init__.py.
36
import thread as _thread
39
from ...lazy_import import lazy_import
32
from bzrlib.lazy_import import lazy_import
40
33
lazy_import(globals(), """
52
from breezy.i18n import gettext
53
from breezy.bzr.smart import client, protocol, request, signals, vfs
54
from breezy.transport import ssh
46
from bzrlib.smart import client, protocol, request, vfs
47
from bzrlib.transport import ssh
49
from bzrlib import osutils
61
51
# Throughout this module buffer size parameters are either limited to be at
62
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
64
54
# from non-sockets as well.
65
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
68
class HpssVfsRequestNotAllowed(errors.BzrError):
70
_fmt = ("VFS requests over the smart server are not allowed. Encountered: "
71
"%(method)s, %(arguments)s.")
73
def __init__(self, method, arguments):
75
self.arguments = arguments
78
57
def _get_protocol_factory_for_bytes(bytes):
79
58
"""Determine the right protocol factory for 'bytes'.
116
95
:returns: a tuple of two strs: (line, excess)
120
99
while newline_pos == -1:
121
100
new_bytes = read_bytes_func(1)
122
101
bytes += new_bytes
124
103
# Ran out of bytes before receiving a complete line.
126
newline_pos = bytes.find(b'\n')
127
line = bytes[:newline_pos + 1]
128
excess = bytes[newline_pos + 1:]
105
newline_pos = bytes.find('\n')
106
line = bytes[:newline_pos+1]
107
excess = bytes[newline_pos+1:]
129
108
return line, excess
135
114
def __init__(self):
136
115
self._push_back_buffer = None
138
def _push_back(self, data):
117
def _push_back(self, bytes):
139
118
"""Return unused bytes to the medium, because they belong to the next
142
121
This sets the _push_back_buffer to the given bytes.
144
if not isinstance(data, bytes):
145
raise TypeError(data)
146
123
if self._push_back_buffer is not None:
147
124
raise AssertionError(
148
125
"_push_back called when self._push_back_buffer is %r"
149
126
% (self._push_back_buffer,))
152
self._push_back_buffer = data
129
self._push_back_buffer = bytes
154
131
def _get_push_back_buffer(self):
155
if self._push_back_buffer == b'':
132
if self._push_back_buffer == '':
156
133
raise AssertionError(
157
134
'%s._push_back_buffer should never be the empty string, '
158
135
'which can be confused with EOF' % (self,))
199
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
202
_bad_file_descriptor = (errno.EBADF,)
203
if sys.platform == 'win32':
204
# Given on Windows if you pass a closed socket to select.select. Probably
205
# also given if you pass a file handle to select.
207
_bad_file_descriptor += (WSAENOTSOCK,)
210
179
class SmartServerStreamMedium(SmartMedium):
211
180
"""Handles smart commands coming over a stream.
225
194
the stream. See also the _push_back method.
230
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
def __init__(self, backing_transport, root_client_path='/'):
231
198
"""Construct new server.
233
200
:param backing_transport: Transport for the directory served.
236
203
self.backing_transport = backing_transport
237
204
self.root_client_path = root_client_path
238
205
self.finished = False
240
raise AssertionError('You must supply a timeout.')
241
self._client_timeout = timeout
242
self._client_poll_timeout = min(timeout / 10.0, 1.0)
243
206
SmartMedium.__init__(self)
251
214
while not self.finished:
252
215
server_protocol = self._build_protocol()
253
216
self._serve_one_request(server_protocol)
254
except errors.ConnectionTimeout as e:
255
trace.note('%s' % (e,))
256
trace.log_exception_quietly()
257
self._disconnect_client()
258
# We reported it, no reason to make a big fuss.
260
except Exception as e:
261
218
stderr.write("%s terminating on exception %s\n" % (self, e))
263
self._disconnect_client()
265
def _stop_gracefully(self):
266
"""When we finish this message, stop looking for more."""
267
trace.mutter('Stopping %s' % (self,))
270
def _disconnect_client(self):
271
"""Close the current connection. We stopped due to a timeout/etc."""
272
# The default implementation is a no-op, because that is all we used to
273
# do when disconnecting from a client. I suppose we never had the
274
# *server* initiate a disconnect, before
276
def _wait_for_bytes_with_timeout(self, timeout_seconds):
277
"""Wait for more bytes to be read, but timeout if none available.
279
This allows us to detect idle connections, and stop trying to read from
280
them, without setting the socket itself to non-blocking. This also
281
allows us to specify when we watch for idle timeouts.
283
:return: Did we timeout? (True if we timed out, False if there is data
286
raise NotImplementedError(self._wait_for_bytes_with_timeout)
288
221
def _build_protocol(self):
289
222
"""Identifies the version of the incoming request, and returns an
295
228
:returns: a SmartServerRequestProtocol.
297
self._wait_for_bytes_with_timeout(self._client_timeout)
299
# We're stopping, so don't try to do any more work
301
230
bytes = self._get_line()
302
231
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
303
232
protocol = protocol_factory(
305
234
protocol.accept_bytes(unused_bytes)
308
def _wait_on_descriptor(self, fd, timeout_seconds):
309
"""select() on a file descriptor, waiting for nonblocking read()
311
This will raise a ConnectionTimeout exception if we do not get a
312
readable handle before timeout_seconds.
315
t_end = self._timer() + timeout_seconds
316
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
318
while not rs and not xs and self._timer() < t_end:
322
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
323
except (select.error, socket.error) as e:
324
err = getattr(e, 'errno', None)
325
if err is None and getattr(e, 'args', None) is not None:
326
# select.error doesn't have 'errno', it just has args[0]
328
if err in _bad_file_descriptor:
329
return # Not a socket indicates read() will fail
330
elif err == errno.EINTR:
331
# Interrupted, keep looping.
335
return # Socket may already be closed
338
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
339
% (timeout_seconds,))
341
237
def _serve_one_request(self, protocol):
342
238
"""Read one request from input, process, send back a response.
344
240
:param protocol: a SmartServerRequestProtocol.
349
243
self._serve_one_request_unguarded(protocol)
350
244
except KeyboardInterrupt:
352
except Exception as e:
353
247
self.terminate_due_to_error()
355
249
def terminate_due_to_error(self):
367
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
369
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
373
266
:param sock: the socket the server will read from. It will be put
374
267
into blocking mode.
376
269
SmartServerStreamMedium.__init__(
377
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
379
271
sock.setblocking(True)
380
272
self.socket = sock
381
# Get the getpeername now, as we might be closed later when we care.
383
self._client_info = sock.getpeername()
385
self._client_info = '<unknown>'
388
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
391
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
394
274
def _serve_one_request_unguarded(self, protocol):
395
275
while protocol.next_read_size():
397
277
# than MAX_SOCKET_CHUNK ready, the socket will just return a
398
278
# short read immediately rather than block.
399
279
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
401
281
self.finished = True
403
283
protocol.accept_bytes(bytes)
405
285
self._push_back(protocol.unused_data)
407
def _disconnect_client(self):
408
"""Close the current connection. We stopped due to a timeout/etc."""
411
def _wait_for_bytes_with_timeout(self, timeout_seconds):
412
"""Wait for more bytes to be read, but timeout if none available.
414
This allows us to detect idle connections, and stop trying to read from
415
them, without setting the socket itself to non-blocking. This also
416
allows us to specify when we watch for idle timeouts.
418
:return: None, this will raise ConnectionTimeout if we time out before
421
return self._wait_on_descriptor(self.socket, timeout_seconds)
423
287
def _read_bytes(self, desired_count):
424
288
return osutils.read_bytes_from_socket(
425
289
self.socket, self._report_activity)
431
295
self.finished = True
433
297
def _write_out(self, bytes):
434
tstart = osutils.perf_counter()
298
tstart = osutils.timer_func()
435
299
osutils.send_all(self.socket, bytes, self._report_activity)
436
300
if 'hpss' in debug.debug_flags:
437
thread_id = _thread.get_ident()
301
thread_id = thread.get_ident()
438
302
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
439
303
% ('wrote', thread_id, len(bytes),
440
osutils.perf_counter() - tstart))
304
osutils.timer_func() - tstart))
443
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
445
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
446
310
"""Construct new server.
448
312
:param in_file: Python file from which requests can be read.
449
313
:param out_file: Python file to write responses.
450
314
:param backing_transport: Transport for the directory served.
452
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
454
317
if sys.platform == 'win32':
455
318
# force binary mode for files
461
324
self._in = in_file
462
325
self._out = out_file
465
"""See SmartServerStreamMedium.serve"""
466
# This is the regular serve, except it adds signal trapping for soft
468
stop_gracefully = self._stop_gracefully
469
signals.register_on_hangup(id(self), stop_gracefully)
471
return super(SmartServerPipeStreamMedium, self).serve()
473
signals.unregister_on_hangup(id(self))
475
327
def _serve_one_request_unguarded(self, protocol):
477
329
# We need to be careful not to read past the end of the current
483
335
self._out.flush()
485
337
bytes = self.read_bytes(bytes_to_read)
487
339
# Connection has been closed.
488
340
self.finished = True
489
341
self._out.flush()
491
343
protocol.accept_bytes(bytes)
493
def _disconnect_client(self):
498
def _wait_for_bytes_with_timeout(self, timeout_seconds):
499
"""Wait for more bytes to be read, but timeout if none available.
501
This allows us to detect idle connections, and stop trying to read from
502
them, without setting the socket itself to non-blocking. This also
503
allows us to specify when we watch for idle timeouts.
505
:return: None, this will raise ConnectionTimeout if we time out before
508
if (getattr(self._in, 'fileno', None) is None
509
or sys.platform == 'win32'):
510
# You can't select() file descriptors on Windows.
513
return self._wait_on_descriptor(self._in, timeout_seconds)
514
except io.UnsupportedOperation:
517
345
def _read_bytes(self, desired_count):
518
346
return self._in.read(desired_count)
663
491
return self._medium._get_line()
666
class _VfsRefuser(object):
667
"""An object that refuses all VFS requests.
672
client._SmartClient.hooks.install_named_hook(
673
'call', self.check_vfs, 'vfs refuser')
675
def check_vfs(self, params):
677
request_method = request.request_handlers.get(params.method)
679
# A method we don't know about doesn't count as a VFS method.
681
if issubclass(request_method, vfs.VfsRequest):
682
raise HpssVfsRequestNotAllowed(params.method, params.args)
685
494
class _DebugCounter(object):
686
495
"""An object that counts the HPSS calls made to each client medium.
688
497
When a medium is garbage-collected, or failing that when
689
breezy.global_state exits, the total number of calls made on that medium
498
bzrlib.global_state exits, the total number of calls made on that medium
690
499
are reported via trace.note.
694
503
self.counts = weakref.WeakKeyDictionary()
695
504
client._SmartClient.hooks.install_named_hook(
696
505
'call', self.increment_call_count, 'hpss call counter')
697
breezy.get_global_state().exit_stack.callback(self.flush_all)
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
699
508
def track(self, medium):
700
509
"""Start tracking calls made to a medium.
734
543
value['count'] = 0
735
544
value['vfs_count'] = 0
737
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
738
count, vfs_count, medium_repr))
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
740
549
def flush_all(self):
741
550
for ref in list(self.counts.keys()):
745
553
_debug_counter = None
749
556
class SmartClientMedium(SmartMedium):
766
573
if _debug_counter is None:
767
574
_debug_counter = _DebugCounter()
768
575
_debug_counter.track(self)
769
if 'hpss_client_no_vfs' in debug.debug_flags:
771
if _vfs_refuser is None:
772
_vfs_refuser = _VfsRefuser()
774
577
def _is_remote_before(self, version_tuple):
775
578
"""Is it possible the remote side supports RPCs for a given version?
799
602
:seealso: _is_remote_before
801
604
if (self._remote_version_is_before is not None and
802
version_tuple > self._remote_version_is_before):
605
version_tuple > self._remote_version_is_before):
803
606
# We have been told that the remote side is older than some version
804
607
# which is newer than a previously supplied older-than version.
805
608
# This indicates that some smart verb call is not guarded
806
609
# appropriately (it should simply not have been tried).
808
611
"_remember_remote_is_before(%r) called, but "
809
"_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
612
"_remember_remote_is_before(%r) was called previously."
613
, version_tuple, self._remote_version_is_before)
810
614
if 'hpss' in debug.debug_flags:
811
615
ui.ui_factory.show_warning(
812
616
"_remember_remote_is_before(%r) called, but "
824
628
medium_request = self.get_request()
825
629
# Send a 'hello' request in protocol version one, for maximum
826
630
# backwards compatibility.
827
client_protocol = protocol.SmartClientRequestProtocolOne(
631
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
829
632
client_protocol.query_version()
830
633
self._done_hello = True
831
except errors.SmartProtocolError as e:
634
except errors.SmartProtocolError, e:
832
635
# Cache the error, just like we would cache a successful
834
637
self._protocol_version_error = e
867
670
medium_base = urlutils.join(self.base, '/')
868
671
rel_url = urlutils.relative_url(medium_base, transport.base)
869
return urlutils.unquote(rel_url)
672
return urllib.unquote(rel_url)
872
675
class SmartClientStreamMedium(SmartClientMedium):
908
711
return SmartClientStreamMediumRequest(self)
911
"""We have been disconnected, reset current state.
913
This resets things like _current_request and connected state.
916
self._current_request = None
919
714
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
920
715
"""A client medium using simple pipes.
927
722
self._readable_pipe = readable_pipe
928
723
self._writeable_pipe = writeable_pipe
930
def _accept_bytes(self, data):
725
def _accept_bytes(self, bytes):
931
726
"""See SmartClientStreamMedium.accept_bytes."""
933
self._writeable_pipe.write(data)
935
if e.errno in (errno.EINVAL, errno.EPIPE):
936
raise errors.ConnectionReset(
937
"Error trying to write to subprocess", e)
939
self._report_activity(len(data), 'write')
727
self._writeable_pipe.write(bytes)
728
self._report_activity(len(bytes), 'write')
941
730
def _flush(self):
942
731
"""See SmartClientStreamMedium._flush()."""
943
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
944
# However, testing shows that even when the child process is
945
# gone, this doesn't error.
946
732
self._writeable_pipe.flush()
948
734
def _read_bytes(self, count):
949
735
"""See SmartClientStreamMedium._read_bytes."""
950
736
bytes_to_read = min(count, _MAX_READ_SIZE)
951
data = self._readable_pipe.read(bytes_to_read)
952
self._report_activity(len(data), 'read')
737
bytes = self._readable_pipe.read(bytes_to_read)
738
self._report_activity(len(bytes), 'read')
956
742
class SSHParams(object):
957
743
"""A set of parameters for starting a remote bzr via SSH."""
959
745
def __init__(self, host, port=None, username=None, password=None,
960
bzr_remote_path='bzr'):
746
bzr_remote_path='bzr'):
963
749
self.username = username
998
784
maybe_port = ':%s' % self._ssh_params.port
999
if self._ssh_params.username is None:
1002
maybe_user = '%s@' % self._ssh_params.username
1003
return "%s(%s://%s%s%s/)" % (
785
return "%s(%s://%s@%s%s/)" % (
1004
786
self.__class__.__name__,
788
self._ssh_params.username,
1007
789
self._ssh_params.host,
1031
813
vendor = self._vendor
1032
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1033
self._ssh_params.password, self._ssh_params.host,
1034
self._ssh_params.port,
1035
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1036
'--directory=/', '--allow-writes'])
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
818
'--directory=/', '--allow-writes'])
1037
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1038
820
if io_kind == 'socket':
1039
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1128
908
port = int(self._port)
1130
910
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1131
socket.SOCK_STREAM, 0, 0)
1132
except socket.gaierror as xxx_todo_changeme:
1133
(err_num, err_msg) = xxx_todo_changeme.args
911
socket.SOCK_STREAM, 0, 0)
912
except socket.gaierror, (err_num, err_msg):
1134
913
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1135
(self._host, port, err_msg))
914
(self._host, port, err_msg))
1136
915
# Initialize err in case there are no addresses returned:
1137
last_err = socket.error("no address found for %s" % self._host)
916
err = socket.error("no address found for %s" % self._host)
1138
917
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1140
919
self._socket = socket.socket(family, socktype, proto)
1141
920
self._socket.setsockopt(socket.IPPROTO_TCP,
1142
921
socket.TCP_NODELAY, 1)
1143
922
self._socket.connect(sockaddr)
1144
except socket.error as err:
923
except socket.error, err:
1145
924
if self._socket is not None:
1146
925
self._socket.close()
1147
926
self._socket = None
1151
929
if self._socket is None:
1152
930
# socket errors either have a (string) or (errno, string) as their
1154
if isinstance(last_err.args, str):
1155
err_msg = last_err.args
932
if type(err.args) is str:
1157
err_msg = last_err.args[1]
935
err_msg = err.args[1]
1158
936
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1159
(self._host, port, err_msg))
937
(self._host, port, err_msg))
1160
938
self._connected = True
1161
for hook in transport.Transport.hooks["post_connect"]:
1165
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1166
942
"""A client medium for an already connected socket.
1168
944
Note that this class will assume it "owns" the socket, so it will close it
1169
945
when its disconnect method is called.