53
53
# We must not read any more than 64k at a time so we don't risk "no buffer
54
54
# space available" errors on some platforms. Windows in particular is likely
55
# to give error 10053 or 10055 if we read more than 64k from a socket.
55
# to throw WSAECONNABORTED or WSAENOBUFS if given too much data at once.
56
# Throughout this module buffer size parameters are either limited to be at
57
# most 64k, or are ignored and 64k is used instead.
56
58
_MAX_READ_SIZE = 64 * 1024
289
291
def _read_bytes(self, desired_count):
290
292
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
293
self.socket, _MAX_READ_SIZE, self._report_activity)
293
295
def terminate_due_to_error(self):
294
296
# TODO: This should log to a server log file, but no such thing
299
301
def _write_out(self, bytes):
300
302
tstart = osutils.timer_func()
301
osutils.send_all(self.socket, bytes, self._report_activity)
303
_send_bytes_chunked(self.socket, bytes, self._report_activity)
302
304
if 'hpss' in debug.debug_flags:
303
305
thread_id = thread.get_ident()
304
306
trace.mutter('%12s: [%s] %d bytes to the socket in %.3fs'
838
840
def _accept_bytes(self, bytes):
839
841
"""See SmartClientMedium.accept_bytes."""
840
842
self._ensure_connection()
841
osutils.send_all(self._socket, bytes, self._report_activity)
843
_send_bytes_chunked(self._socket, bytes, self._report_activity)
843
845
def disconnect(self):
844
846
"""See SmartClientMedium.disconnect()."""
899
901
if not self._connected:
900
902
raise errors.MediumNotConnected(self)
901
903
return _read_bytes_from_socket(
902
self._socket.recv, count, self._report_activity)
904
self._socket, _MAX_READ_SIZE, self._report_activity)
905
907
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
945
947
def _read_bytes_from_socket(sock, desired_count, report_activity):
946
# We ignore the desired_count because on sockets it's more efficient to
947
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
949
bytes = sock(_MAX_READ_SIZE)
950
except socket.error, e:
951
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
952
# The connection was closed by the other side. Callers expect an
953
# empty string to signal end-of-stream.
958
report_activity(len(bytes), 'read')
948
"""Read upto desired_count of bytes from sock and notify of progress
950
Translates "Connection reset by peer" into a normal disconnect, and
951
repeats the recv if interrupted by a signal.
955
bytes = sock.recv(desired_count)
956
except socket.error, e:
958
if eno == getattr(errno, "WSAECONNRESET", errno.ECONNRESET):
959
# The connection was closed by the other side. Callers expect an
960
# empty string to signal end-of-stream.
962
if eno != errno.EINTR:
965
report_activity(len(bytes), 'read')
968
def _send_bytes_chunked(sock, bytes, report_activity):
969
"""Send bytes on sock and notify of progress
971
Breaks large blocks in smaller chunks to avoid buffering limitations on
972
some platforms, and catches EINTR which may be thrown if the send is
973
interrupted by a signal.
975
For a socket implementation without these issues, this function has much
978
report_activity(len(bytes), 'write')
981
byte_count = len(bytes)
982
while sent_total < byte_count:
984
sent = sock.send(buffer(bytes, sent_total, _MAX_READ_SIZE))
985
except socket.error, e:
986
if e.args[0] != errno.EINTR:
990
report_activity(sent, 'write')