162
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
"""Handles smart commands coming over a stream.
274
287
def _read_bytes(self, desired_count):
275
288
# We ignore the desired_count because on sockets it's more efficient to
276
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
279
294
def terminate_due_to_error(self):
280
295
# TODO: This should log to a server log file, but no such thing
283
298
self.finished = True
285
300
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
301
osutils.send_all(self.socket, bytes, self._report_activity)
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
689
704
def _accept_bytes(self, bytes):
690
705
"""See SmartClientStreamMedium.accept_bytes."""
691
706
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
693
709
def _flush(self):
694
710
"""See SmartClientStreamMedium._flush()."""
697
713
def _read_bytes(self, count):
698
714
"""See SmartClientStreamMedium._read_bytes."""
699
return self._readable_pipe.read(count)
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
702
720
class SmartSSHClientMedium(SmartClientStreamMedium):
709
727
:param vendor: An optional override for the ssh vendor to use. See
710
728
bzrlib.transport.ssh for details on ssh vendors.
712
SmartClientStreamMedium.__init__(self, base)
713
730
self._connected = False
714
731
self._host = host
715
732
self._password = password
716
733
self._port = port
717
734
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
718
739
self._read_from = None
719
740
self._ssh_connection = None
720
741
self._vendor = vendor
721
742
self._write_to = None
722
743
self._bzr_remote_path = bzr_remote_path
723
if self._bzr_remote_path is None:
724
symbol_versioning.warn(
725
'bzr_remote_path is required as of bzr 0.92',
726
DeprecationWarning, stacklevel=2)
727
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
744
# for the benefit of progress making a short description of this
746
self._scheme = 'bzr+ssh'
749
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
750
self.__class__.__name__,
729
756
def _accept_bytes(self, bytes):
730
757
"""See SmartClientStreamMedium.accept_bytes."""
731
758
self._ensure_connection()
732
759
self._write_to.write(bytes)
760
self._report_activity(len(bytes), 'write')
734
762
def disconnect(self):
735
763
"""See SmartClientMedium.disconnect()."""
765
793
if not self._connected:
766
794
raise errors.MediumNotConnected(self)
767
795
bytes_to_read = min(count, _MAX_READ_SIZE)
768
return self._read_from.read(bytes_to_read)
796
bytes = self._read_from.read(bytes_to_read)
797
self._report_activity(len(bytes), 'read')
771
801
# Port 4155 is the default port for bzr://, registered with IANA.
787
817
def _accept_bytes(self, bytes):
788
818
"""See SmartClientMedium.accept_bytes."""
789
819
self._ensure_connection()
790
osutils.send_all(self._socket, bytes)
820
osutils.send_all(self._socket, bytes, self._report_activity)
792
822
def disconnect(self):
793
823
"""See SmartClientMedium.disconnect()."""
850
880
# We ignore the desired_count because on sockets it's more efficient to
851
881
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
853
return self._socket.recv(_MAX_READ_SIZE)
883
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
854
884
except socket.error, e:
855
885
if len(e.args) and e.args[0] == errno.ECONNRESET:
856
886
# Callers expect an empty string in that case
891
self._report_activity(len(bytes), 'read')
862
895
class SmartClientStreamMediumRequest(SmartClientMediumRequest):