162
163
self._push_back(excess)
166
def _report_activity(self, bytes, direction):
167
"""Notify that this medium has activity.
169
Implementations should call this from all methods that actually do IO.
170
Be careful that it's not called twice, if one method is implemented on
173
:param bytes: Number of bytes read or written.
174
:param direction: 'read' or 'write' or None.
176
ui.ui_factory.report_transport_activity(self, bytes, direction)
166
179
class SmartServerStreamMedium(SmartMedium):
167
180
"""Handles smart commands coming over a stream.
274
287
def _read_bytes(self, desired_count):
275
288
# We ignore the desired_count because on sockets it's more efficient to
276
289
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
277
return osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
290
bytes = osutils.until_no_eintr(self.socket.recv, _MAX_READ_SIZE)
291
self._report_activity(len(bytes), 'read')
279
294
def terminate_due_to_error(self):
280
295
# TODO: This should log to a server log file, but no such thing
283
298
self.finished = True
285
300
def _write_out(self, bytes):
286
osutils.send_all(self.socket, bytes)
301
osutils.send_all(self.socket, bytes, self._report_activity)
289
304
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
689
704
def _accept_bytes(self, bytes):
690
705
"""See SmartClientStreamMedium.accept_bytes."""
691
706
self._writeable_pipe.write(bytes)
707
self._report_activity(len(bytes), 'write')
693
709
def _flush(self):
694
710
"""See SmartClientStreamMedium._flush()."""
697
713
def _read_bytes(self, count):
698
714
"""See SmartClientStreamMedium._read_bytes."""
699
return self._readable_pipe.read(count)
715
bytes = self._readable_pipe.read(count)
716
self._report_activity(len(bytes), 'read')
702
720
class SmartSSHClientMedium(SmartClientStreamMedium):
709
727
:param vendor: An optional override for the ssh vendor to use. See
710
728
bzrlib.transport.ssh for details on ssh vendors.
712
SmartClientStreamMedium.__init__(self, base)
713
730
self._connected = False
714
731
self._host = host
715
732
self._password = password
716
733
self._port = port
717
734
self._username = username
735
# SmartClientStreamMedium stores the repr of this object in its
736
# _DebugCounter so we have to store all the values used in our repr
737
# method before calling the super init.
738
SmartClientStreamMedium.__init__(self, base)
718
739
self._read_from = None
719
740
self._ssh_connection = None
720
741
self._vendor = vendor
721
742
self._write_to = None
722
743
self._bzr_remote_path = bzr_remote_path
723
if self._bzr_remote_path is None:
724
symbol_versioning.warn(
725
'bzr_remote_path is required as of bzr 0.92',
726
DeprecationWarning, stacklevel=2)
727
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
746
return "%s(connected=%r, username=%r, host=%r, port=%r)" % (
747
self.__class__.__name__,
729
753
def _accept_bytes(self, bytes):
730
754
"""See SmartClientStreamMedium.accept_bytes."""
731
755
self._ensure_connection()
732
756
self._write_to.write(bytes)
757
self._report_activity(len(bytes), 'write')
734
759
def disconnect(self):
735
760
"""See SmartClientMedium.disconnect()."""
765
790
if not self._connected:
766
791
raise errors.MediumNotConnected(self)
767
792
bytes_to_read = min(count, _MAX_READ_SIZE)
768
return self._read_from.read(bytes_to_read)
793
bytes = self._read_from.read(bytes_to_read)
794
self._report_activity(len(bytes), 'read')
771
798
# Port 4155 is the default port for bzr://, registered with IANA.
787
814
def _accept_bytes(self, bytes):
788
815
"""See SmartClientMedium.accept_bytes."""
789
816
self._ensure_connection()
790
osutils.send_all(self._socket, bytes)
817
osutils.send_all(self._socket, bytes, self._report_activity)
792
819
def disconnect(self):
793
820
"""See SmartClientMedium.disconnect()."""
850
877
# We ignore the desired_count because on sockets it's more efficient to
851
878
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
853
return self._socket.recv(_MAX_READ_SIZE)
880
bytes = osutils.until_no_eintr(self._socket.recv, _MAX_READ_SIZE)
854
881
except socket.error, e:
855
882
if len(e.args) and e.args[0] == errno.ECONNRESET:
856
883
# Callers expect an empty string in that case
888
self._report_activity(len(bytes), 'read')
862
892
class SmartClientStreamMediumRequest(SmartClientMediumRequest):