88
89
def _get_line(read_bytes_func):
89
90
"""Read bytes using read_bytes_func until a newline byte.
91
92
This isn't particularly efficient, so should only be used when the
92
93
expected size of the line is quite short.
94
95
:returns: a tuple of two strs: (line, excess)
161
162
line, excess = _get_line(self.read_bytes)
162
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
"""Handles smart commands coming over a stream.
172
185
One instance is created for each connected client; it can serve multiple
173
186
requests in the lifetime of the connection.
175
The server passes requests through to an underlying backing transport,
188
The server passes requests through to an underlying backing transport,
176
189
which will typically be a LocalTransport looking at the server's filesystem.
178
191
:ivar _push_back_buffer: a str of bytes that have been read from the stream
268
281
self.finished = True
270
283
protocol.accept_bytes(bytes)
272
285
self._push_back(protocol.unused_data)
274
287
def _read_bytes(self, desired_count):
275
288
# We ignore the desired_count because on sockets it's more efficient to
276
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
279
294
def terminate_due_to_error(self):
280
295
# TODO: This should log to a server log file, but no such thing
283
298
self.finished = True
285
300
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
301
osutils.send_all(self.socket, bytes, self._report_activity)
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
350
365
request.finished_reading()
352
367
It is up to the individual SmartClientMedium whether multiple concurrent
353
requests can exist. See SmartClientMedium.get_request to obtain instances
354
of SmartClientMediumRequest, and the concrete Medium you are using for
368
requests can exist. See SmartClientMedium.get_request to obtain instances
369
of SmartClientMediumRequest, and the concrete Medium you are using for
355
370
details on concurrency and pipelining.
403
418
def _finished_reading(self):
404
419
"""Helper for finished_reading.
406
finished_reading checks the state of the request to determine if
421
finished_reading checks the state of the request to determine if
407
422
finished_reading is allowed, and if it is hands off to _finished_reading
408
423
to perform the action.
423
438
def _finished_writing(self):
424
439
"""Helper for finished_writing.
426
finished_writing checks the state of the request to determine if
441
finished_writing checks the state of the request to determine if
427
442
finished_writing is allowed, and if it is hands off to _finished_writing
428
443
to perform the action.
460
475
if not line.endswith('\n'):
461
476
# end of file encountered reading from server
462
477
raise errors.ConnectionReset(
463
"please check connectivity and permissions",
464
"(and try -Dhpss if further diagnosis is required)")
478
"please check connectivity and permissions")
467
481
def _read_line(self):
468
482
"""Helper for SmartClientMediumRequest.read_line.
470
484
By default this forwards to self._medium._get_line because we are
471
485
operating on the medium's stream.
518
532
trace.note('HPSS calls: %d %s', count, medium_repr)
520
534
def flush_all(self):
521
535
for ref in list(self.counts.keys()):
524
538
_debug_counter = None
527
541
class SmartClientMedium(SmartMedium):
528
542
"""Smart client is a medium for sending smart protocol requests over."""
575
589
if (self._remote_version_is_before is not None and
576
590
version_tuple > self._remote_version_is_before):
591
# We have been told that the remote side is older than some version
592
# which is newer than a previously supplied older-than version.
593
# This indicates that some smart verb call is not guarded
594
# appropriately (it should simply not have been tried).
577
595
raise AssertionError(
578
596
"_remember_remote_is_before(%r) called, but "
579
597
"_remember_remote_is_before(%r) was called previously."
618
636
def disconnect(self):
619
637
"""If this medium maintains a persistent connection, close it.
621
639
The default implementation does nothing.
624
642
def remote_path_from_transport(self, transport):
625
643
"""Convert transport into a path suitable for using in a request.
627
645
Note that the resulting remote path doesn't encode the host name or
628
646
anything but path, so it is only safe to use it in requests sent over
629
647
the medium from the matching transport.
686
704
def _accept_bytes(self, bytes):
687
705
"""See SmartClientStreamMedium.accept_bytes."""
688
706
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
690
709
def _flush(self):
691
710
"""See SmartClientStreamMedium._flush()."""
694
713
def _read_bytes(self, count):
695
714
"""See SmartClientStreamMedium._read_bytes."""
696
return self._readable_pipe.read(count)
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
699
720
class SmartSSHClientMedium(SmartClientStreamMedium):
700
721
"""A client medium using SSH."""
702
723
def __init__(self, host, port=None, username=None, password=None,
703
724
base=None, vendor=None, bzr_remote_path=None):
704
725
"""Creates a client that will connect on the first use.
706
727
:param vendor: An optional override for the ssh vendor to use. See
707
728
bzrlib.transport.ssh for details on ssh vendors.
709
SmartClientStreamMedium.__init__(self, base)
710
730
self._connected = False
711
731
self._host = host
712
732
self._password = password
713
733
self._port = port
714
734
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
715
739
self._read_from = None
716
740
self._ssh_connection = None
717
741
self._vendor = vendor
718
742
self._write_to = None
719
743
self._bzr_remote_path = bzr_remote_path
720
if self._bzr_remote_path is None:
721
symbol_versioning.warn(
722
'bzr_remote_path is required as of bzr 0.92',
723
DeprecationWarning, stacklevel=2)
724
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
750
self.__class__.__name__,
726
756
def _accept_bytes(self, bytes):
727
757
"""See SmartClientStreamMedium.accept_bytes."""
728
758
self._ensure_connection()
729
759
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
731
762
def disconnect(self):
732
763
"""See SmartClientMedium.disconnect()."""
762
793
if not self._connected:
763
794
raise errors.MediumNotConnected(self)
764
795
bytes_to_read = min(count, _MAX_READ_SIZE)
765
return self._read_from.read(bytes_to_read)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
768
801
# Port 4155 is the default port for bzr://, registered with IANA.
773
806
class SmartTCPClientMedium(SmartClientStreamMedium):
774
807
"""A client medium using TCP."""
776
809
def __init__(self, host, port, base):
777
810
"""Creates a client that will connect on the first use."""
778
811
SmartClientStreamMedium.__init__(self, base)
784
817
def _accept_bytes(self, bytes):
785
818
"""See SmartClientMedium.accept_bytes."""
786
819
self._ensure_connection()
787
osutils.send_all(self._socket, bytes)
820
osutils.send_all(self._socket, bytes, self._report_activity)
789
822
def disconnect(self):
790
823
"""See SmartClientMedium.disconnect()."""
804
837
port = int(self._port)
806
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
839
sockaddrs = socket.getaddrinfo(self._host, port, socket.AF_UNSPEC,
807
840
socket.SOCK_STREAM, 0, 0)
808
841
except socket.gaierror, (err_num, err_msg):
809
842
raise errors.ConnectionError("failed to lookup %s:%d: %s" %
813
846
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
815
848
self._socket = socket.socket(family, socktype, proto)
816
self._socket.setsockopt(socket.IPPROTO_TCP,
849
self._socket.setsockopt(socket.IPPROTO_TCP,
817
850
socket.TCP_NODELAY, 1)
818
851
self._socket.connect(sockaddr)
819
852
except socket.error, err:
847
880
# We ignore the desired_count because on sockets it's more efficient to
848
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
850
return self._socket.recv(_MAX_READ_SIZE)
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
851
884
except socket.error, e:
852
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
853
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
859
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
881
917
def _finished_reading(self):
882
918
"""See SmartClientMediumRequest._finished_reading.
884
This clears the _current_request on self._medium to allow a new
920
This clears the _current_request on self._medium to allow a new
885
921
request to be created.
887
923
if self._medium._current_request is not self:
888
924
raise AssertionError()
889
925
self._medium._current_request = None
891
927
def _finished_writing(self):
892
928
"""See SmartClientMediumRequest._finished_writing.