22
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
breezy/transport/smart/__init__.py.
24
bzrlib/transport/smart/__init__.py.
27
from __future__ import absolute_import
38
import thread as _thread
41
from ...lazy_import import lazy_import
32
from bzrlib.lazy_import import lazy_import
42
33
lazy_import(globals(), """
54
from breezy.i18n import gettext
55
from breezy.bzr.smart import client, protocol, request, signals, vfs
56
from breezy.transport import ssh
46
from bzrlib.smart import client, protocol, request, vfs
47
from bzrlib.transport import ssh
49
from bzrlib import osutils
63
51
# Throughout this module buffer size parameters are either limited to be at
64
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
66
54
# from non-sockets as well.
67
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
70
class HpssVfsRequestNotAllowed(errors.BzrError):
72
_fmt = ("VFS requests over the smart server are not allowed. Encountered: "
73
"%(method)s, %(arguments)s.")
75
def __init__(self, method, arguments):
77
self.arguments = arguments
80
57
def _get_protocol_factory_for_bytes(bytes):
81
58
"""Determine the right protocol factory for 'bytes'.
118
95
:returns: a tuple of two strs: (line, excess)
122
99
while newline_pos == -1:
123
100
new_bytes = read_bytes_func(1)
124
101
bytes += new_bytes
126
103
# Ran out of bytes before receiving a complete line.
128
newline_pos = bytes.find(b'\n')
129
line = bytes[:newline_pos + 1]
130
excess = bytes[newline_pos + 1:]
105
newline_pos = bytes.find('\n')
106
line = bytes[:newline_pos+1]
107
excess = bytes[newline_pos+1:]
131
108
return line, excess
137
114
def __init__(self):
138
115
self._push_back_buffer = None
140
def _push_back(self, data):
117
def _push_back(self, bytes):
141
118
"""Return unused bytes to the medium, because they belong to the next
144
121
This sets the _push_back_buffer to the given bytes.
146
if not isinstance(data, bytes):
147
raise TypeError(data)
148
123
if self._push_back_buffer is not None:
149
124
raise AssertionError(
150
125
"_push_back called when self._push_back_buffer is %r"
151
126
% (self._push_back_buffer,))
154
self._push_back_buffer = data
129
self._push_back_buffer = bytes
156
131
def _get_push_back_buffer(self):
157
if self._push_back_buffer == b'':
132
if self._push_back_buffer == '':
158
133
raise AssertionError(
159
134
'%s._push_back_buffer should never be the empty string, '
160
135
'which can be confused with EOF' % (self,))
201
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
204
_bad_file_descriptor = (errno.EBADF,)
205
if sys.platform == 'win32':
206
# Given on Windows if you pass a closed socket to select.select. Probably
207
# also given if you pass a file handle to select.
209
_bad_file_descriptor += (WSAENOTSOCK,)
212
179
class SmartServerStreamMedium(SmartMedium):
213
180
"""Handles smart commands coming over a stream.
227
194
the stream. See also the _push_back method.
232
def __init__(self, backing_transport, root_client_path='/', timeout=None):
197
def __init__(self, backing_transport, root_client_path='/'):
233
198
"""Construct new server.
235
200
:param backing_transport: Transport for the directory served.
238
203
self.backing_transport = backing_transport
239
204
self.root_client_path = root_client_path
240
205
self.finished = False
242
raise AssertionError('You must supply a timeout.')
243
self._client_timeout = timeout
244
self._client_poll_timeout = min(timeout / 10.0, 1.0)
245
206
SmartMedium.__init__(self)
253
214
while not self.finished:
254
215
server_protocol = self._build_protocol()
255
216
self._serve_one_request(server_protocol)
256
except errors.ConnectionTimeout as e:
257
trace.note('%s' % (e,))
258
trace.log_exception_quietly()
259
self._disconnect_client()
260
# We reported it, no reason to make a big fuss.
262
except Exception as e:
263
218
stderr.write("%s terminating on exception %s\n" % (self, e))
265
self._disconnect_client()
267
def _stop_gracefully(self):
268
"""When we finish this message, stop looking for more."""
269
trace.mutter('Stopping %s' % (self,))
272
def _disconnect_client(self):
273
"""Close the current connection. We stopped due to a timeout/etc."""
274
# The default implementation is a no-op, because that is all we used to
275
# do when disconnecting from a client. I suppose we never had the
276
# *server* initiate a disconnect, before
278
def _wait_for_bytes_with_timeout(self, timeout_seconds):
279
"""Wait for more bytes to be read, but timeout if none available.
281
This allows us to detect idle connections, and stop trying to read from
282
them, without setting the socket itself to non-blocking. This also
283
allows us to specify when we watch for idle timeouts.
285
:return: Did we timeout? (True if we timed out, False if there is data
288
raise NotImplementedError(self._wait_for_bytes_with_timeout)
290
221
def _build_protocol(self):
291
222
"""Identifies the version of the incoming request, and returns an
297
228
:returns: a SmartServerRequestProtocol.
299
self._wait_for_bytes_with_timeout(self._client_timeout)
301
# We're stopping, so don't try to do any more work
303
230
bytes = self._get_line()
304
231
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
305
232
protocol = protocol_factory(
307
234
protocol.accept_bytes(unused_bytes)
310
def _wait_on_descriptor(self, fd, timeout_seconds):
311
"""select() on a file descriptor, waiting for nonblocking read()
313
This will raise a ConnectionTimeout exception if we do not get a
314
readable handle before timeout_seconds.
317
t_end = self._timer() + timeout_seconds
318
poll_timeout = min(timeout_seconds, self._client_poll_timeout)
320
while not rs and not xs and self._timer() < t_end:
324
rs, _, xs = select.select([fd], [], [fd], poll_timeout)
325
except (select.error, socket.error) as e:
326
err = getattr(e, 'errno', None)
327
if err is None and getattr(e, 'args', None) is not None:
328
# select.error doesn't have 'errno', it just has args[0]
330
if err in _bad_file_descriptor:
331
return # Not a socket indicates read() will fail
332
elif err == errno.EINTR:
333
# Interrupted, keep looping.
337
return # Socket may already be closed
340
raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
341
% (timeout_seconds,))
343
237
def _serve_one_request(self, protocol):
344
238
"""Read one request from input, process, send back a response.
346
240
:param protocol: a SmartServerRequestProtocol.
351
243
self._serve_one_request_unguarded(protocol)
352
244
except KeyboardInterrupt:
354
except Exception as e:
355
247
self.terminate_due_to_error()
357
249
def terminate_due_to_error(self):
369
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
371
def __init__(self, sock, backing_transport, root_client_path='/',
263
def __init__(self, sock, backing_transport, root_client_path='/'):
375
266
:param sock: the socket the server will read from. It will be put
376
267
into blocking mode.
378
269
SmartServerStreamMedium.__init__(
379
self, backing_transport, root_client_path=root_client_path,
270
self, backing_transport, root_client_path=root_client_path)
381
271
sock.setblocking(True)
382
272
self.socket = sock
383
# Get the getpeername now, as we might be closed later when we care.
385
self._client_info = sock.getpeername()
387
self._client_info = '<unknown>'
390
return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
393
return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
396
274
def _serve_one_request_unguarded(self, protocol):
397
275
while protocol.next_read_size():
399
277
# than MAX_SOCKET_CHUNK ready, the socket will just return a
400
278
# short read immediately rather than block.
401
279
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
403
281
self.finished = True
405
283
protocol.accept_bytes(bytes)
407
285
self._push_back(protocol.unused_data)
409
def _disconnect_client(self):
410
"""Close the current connection. We stopped due to a timeout/etc."""
413
def _wait_for_bytes_with_timeout(self, timeout_seconds):
414
"""Wait for more bytes to be read, but timeout if none available.
416
This allows us to detect idle connections, and stop trying to read from
417
them, without setting the socket itself to non-blocking. This also
418
allows us to specify when we watch for idle timeouts.
420
:return: None, this will raise ConnectionTimeout if we time out before
423
return self._wait_on_descriptor(self.socket, timeout_seconds)
425
287
def _read_bytes(self, desired_count):
426
288
return osutils.read_bytes_from_socket(
427
289
self.socket, self._report_activity)
433
295
self.finished = True
435
297
def _write_out(self, bytes):
436
tstart = osutils.perf_counter()
298
tstart = osutils.timer_func()
437
299
osutils.send_all(self.socket, bytes, self._report_activity)
438
300
if 'hpss' in debug.debug_flags:
439
thread_id = _thread.get_ident()
301
thread_id = thread.get_ident()
440
302
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
441
303
% ('wrote', thread_id, len(bytes),
442
osutils.perf_counter() - tstart))
304
osutils.timer_func() - tstart))
445
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
447
def __init__(self, in_file, out_file, backing_transport, timeout=None):
309
def __init__(self, in_file, out_file, backing_transport):
448
310
"""Construct new server.
450
312
:param in_file: Python file from which requests can be read.
451
313
:param out_file: Python file to write responses.
452
314
:param backing_transport: Transport for the directory served.
454
SmartServerStreamMedium.__init__(self, backing_transport,
316
SmartServerStreamMedium.__init__(self, backing_transport)
456
317
if sys.platform == 'win32':
457
318
# force binary mode for files
463
324
self._in = in_file
464
325
self._out = out_file
467
"""See SmartServerStreamMedium.serve"""
468
# This is the regular serve, except it adds signal trapping for soft
470
stop_gracefully = self._stop_gracefully
471
signals.register_on_hangup(id(self), stop_gracefully)
473
return super(SmartServerPipeStreamMedium, self).serve()
475
signals.unregister_on_hangup(id(self))
477
327
def _serve_one_request_unguarded(self, protocol):
479
329
# We need to be careful not to read past the end of the current
485
335
self._out.flush()
487
337
bytes = self.read_bytes(bytes_to_read)
489
339
# Connection has been closed.
490
340
self.finished = True
491
341
self._out.flush()
493
343
protocol.accept_bytes(bytes)
495
def _disconnect_client(self):
500
def _wait_for_bytes_with_timeout(self, timeout_seconds):
501
"""Wait for more bytes to be read, but timeout if none available.
503
This allows us to detect idle connections, and stop trying to read from
504
them, without setting the socket itself to non-blocking. This also
505
allows us to specify when we watch for idle timeouts.
507
:return: None, this will raise ConnectionTimeout if we time out before
510
if (getattr(self._in, 'fileno', None) is None
511
or sys.platform == 'win32'):
512
# You can't select() file descriptors on Windows.
515
return self._wait_on_descriptor(self._in, timeout_seconds)
516
except io.UnsupportedOperation:
519
345
def _read_bytes(self, desired_count):
520
346
return self._in.read(desired_count)
665
491
return self._medium._get_line()
668
class _VfsRefuser(object):
669
"""An object that refuses all VFS requests.
674
client._SmartClient.hooks.install_named_hook(
675
'call', self.check_vfs, 'vfs refuser')
677
def check_vfs(self, params):
679
request_method = request.request_handlers.get(params.method)
681
# A method we don't know about doesn't count as a VFS method.
683
if issubclass(request_method, vfs.VfsRequest):
684
raise HpssVfsRequestNotAllowed(params.method, params.args)
687
494
class _DebugCounter(object):
688
495
"""An object that counts the HPSS calls made to each client medium.
690
497
When a medium is garbage-collected, or failing that when
691
breezy.global_state exits, the total number of calls made on that medium
498
bzrlib.global_state exits, the total number of calls made on that medium
692
499
are reported via trace.note.
696
503
self.counts = weakref.WeakKeyDictionary()
697
504
client._SmartClient.hooks.install_named_hook(
698
505
'call', self.increment_call_count, 'hpss call counter')
699
breezy.get_global_state().exit_stack.callback(self.flush_all)
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
701
508
def track(self, medium):
702
509
"""Start tracking calls made to a medium.
736
543
value['count'] = 0
737
544
value['vfs_count'] = 0
739
trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
740
count, vfs_count, medium_repr))
546
trace.note('HPSS calls: %d (%d vfs) %s',
547
count, vfs_count, medium_repr)
742
549
def flush_all(self):
743
550
for ref in list(self.counts.keys()):
747
553
_debug_counter = None
751
556
class SmartClientMedium(SmartMedium):
768
573
if _debug_counter is None:
769
574
_debug_counter = _DebugCounter()
770
575
_debug_counter.track(self)
771
if 'hpss_client_no_vfs' in debug.debug_flags:
773
if _vfs_refuser is None:
774
_vfs_refuser = _VfsRefuser()
776
577
def _is_remote_before(self, version_tuple):
777
578
"""Is it possible the remote side supports RPCs for a given version?
801
602
:seealso: _is_remote_before
803
604
if (self._remote_version_is_before is not None and
804
version_tuple > self._remote_version_is_before):
605
version_tuple > self._remote_version_is_before):
805
606
# We have been told that the remote side is older than some version
806
607
# which is newer than a previously supplied older-than version.
807
608
# This indicates that some smart verb call is not guarded
808
609
# appropriately (it should simply not have been tried).
810
611
"_remember_remote_is_before(%r) called, but "
811
"_remember_remote_is_before(%r) was called previously.", version_tuple, self._remote_version_is_before)
612
"_remember_remote_is_before(%r) was called previously."
613
, version_tuple, self._remote_version_is_before)
812
614
if 'hpss' in debug.debug_flags:
813
615
ui.ui_factory.show_warning(
814
616
"_remember_remote_is_before(%r) called, but "
826
628
medium_request = self.get_request()
827
629
# Send a 'hello' request in protocol version one, for maximum
828
630
# backwards compatibility.
829
client_protocol = protocol.SmartClientRequestProtocolOne(
631
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
831
632
client_protocol.query_version()
832
633
self._done_hello = True
833
except errors.SmartProtocolError as e:
634
except errors.SmartProtocolError, e:
834
635
# Cache the error, just like we would cache a successful
836
637
self._protocol_version_error = e
869
670
medium_base = urlutils.join(self.base, '/')
870
671
rel_url = urlutils.relative_url(medium_base, transport.base)
871
return urlutils.unquote(rel_url)
672
return urllib.unquote(rel_url)
874
675
class SmartClientStreamMedium(SmartClientMedium):
910
711
return SmartClientStreamMediumRequest(self)
913
"""We have been disconnected, reset current state.
915
This resets things like _current_request and connected state.
918
self._current_request = None
921
714
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
922
715
"""A client medium using simple pipes.
929
722
self._readable_pipe = readable_pipe
930
723
self._writeable_pipe = writeable_pipe
932
def _accept_bytes(self, data):
725
def _accept_bytes(self, bytes):
933
726
"""See SmartClientStreamMedium.accept_bytes."""
935
self._writeable_pipe.write(data)
937
if e.errno in (errno.EINVAL, errno.EPIPE):
938
raise errors.ConnectionReset(
939
"Error trying to write to subprocess", e)
941
self._report_activity(len(data), 'write')
727
self._writeable_pipe.write(bytes)
728
self._report_activity(len(bytes), 'write')
943
730
def _flush(self):
944
731
"""See SmartClientStreamMedium._flush()."""
945
# Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
946
# However, testing shows that even when the child process is
947
# gone, this doesn't error.
948
732
self._writeable_pipe.flush()
950
734
def _read_bytes(self, count):
951
735
"""See SmartClientStreamMedium._read_bytes."""
952
736
bytes_to_read = min(count, _MAX_READ_SIZE)
953
data = self._readable_pipe.read(bytes_to_read)
954
self._report_activity(len(data), 'read')
737
bytes = self._readable_pipe.read(bytes_to_read)
738
self._report_activity(len(bytes), 'read')
958
742
class SSHParams(object):
959
743
"""A set of parameters for starting a remote bzr via SSH."""
961
745
def __init__(self, host, port=None, username=None, password=None,
962
bzr_remote_path='bzr'):
746
bzr_remote_path='bzr'):
965
749
self.username = username
1000
784
maybe_port = ':%s' % self._ssh_params.port
1001
if self._ssh_params.username is None:
1004
maybe_user = '%s@' % self._ssh_params.username
1005
return "%s(%s://%s%s%s/)" % (
785
return "%s(%s://%s@%s%s/)" % (
1006
786
self.__class__.__name__,
788
self._ssh_params.username,
1009
789
self._ssh_params.host,
1033
813
vendor = self._vendor
1034
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
1035
self._ssh_params.password, self._ssh_params.host,
1036
self._ssh_params.port,
1037
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
1038
'--directory=/', '--allow-writes'])
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
818
'--directory=/', '--allow-writes'])
1039
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
1040
820
if io_kind == 'socket':
1041
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
1130
908
port = int(self._port)
1132
910
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
1133
socket.SOCK_STREAM, 0, 0)
1134
except socket.gaierror as xxx_todo_changeme:
1135
(err_num, err_msg) = xxx_todo_changeme.args
911
socket.SOCK_STREAM, 0, 0)
912
except socket.gaierror, (err_num, err_msg):
1136
913
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
1137
(self._host, port, err_msg))
914
(self._host, port, err_msg))
1138
915
# Initialize err in case there are no addresses returned:
1139
last_err = socket.error("no address found for %s" % self._host)
916
err = socket.error("no address found for %s" % self._host)
1140
917
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
1142
919
self._socket = socket.socket(family, socktype, proto)
1143
920
self._socket.setsockopt(socket.IPPROTO_TCP,
1144
921
socket.TCP_NODELAY, 1)
1145
922
self._socket.connect(sockaddr)
1146
except socket.error as err:
923
except socket.error, err:
1147
924
if self._socket is not None:
1148
925
self._socket.close()
1149
926
self._socket = None
1153
929
if self._socket is None:
1154
930
# socket errors either have a (string) or (errno, string) as their
1156
if isinstance(last_err.args, str):
1157
err_msg = last_err.args
932
if type(err.args) is str:
1159
err_msg = last_err.args[1]
935
err_msg = err.args[1]
1160
936
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1161
(self._host, port, err_msg))
937
(self._host, port, err_msg))
1162
938
self._connected = True
1163
for hook in transport.Transport.hooks["post_connect"]:
1167
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1168
942
"""A client medium for an already connected socket.
1170
944
Note that this class will assume it "owns" the socket, so it will close it
1171
945
when its disconnect method is called.