45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
45
51
def _get_protocol_factory_for_bytes(bytes):
46
52
"""Determine the right protocol factory for 'bytes'.
74
80
return protocol_factory, bytes
77
class SmartServerStreamMedium(object):
83
class SmartMedium(object):
84
"""Base class for smart protocol media, both client- and server-side."""
87
self._push_back_buffer = None
89
def _push_back(self, bytes):
90
"""Return unused bytes to the medium, because they belong to the next
93
This sets the _push_back_buffer to the given bytes.
95
if self._push_back_buffer is not None:
97
"_push_back called when self._push_back_buffer is %r"
98
% (self._push_back_buffer,))
101
self._push_back_buffer = bytes
103
def _get_push_back_buffer(self):
104
if self._push_back_buffer == '':
105
raise AssertionError(
106
'%s._push_back_buffer should never be the empty string, '
107
'which can be confused with EOF' % (self,))
108
bytes = self._push_back_buffer
109
self._push_back_buffer = None
112
def read_bytes(self, desired_count):
113
"""Read some bytes from this medium.
115
:returns: some bytes, possibly more or less than the number requested
116
in 'desired_count' depending on the medium.
118
if self._push_back_buffer is not None:
119
return self._get_push_back_buffer()
120
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
121
return self._read_bytes(bytes_to_read)
123
def _read_bytes(self, count):
124
raise NotImplementedError(self._read_bytes)
127
"""Read bytes from this request's response until a newline byte.
129
This isn't particularly efficient, so should only be used when the
130
expected size of the line is quite short.
132
:returns: a string of bytes ending in a newline (byte 0x0A).
136
while newline_pos == -1:
137
new_bytes = self.read_bytes(1)
140
# Ran out of bytes before receiving a complete line.
142
newline_pos = bytes.find('\n')
143
line = bytes[:newline_pos+1]
144
self._push_back(bytes[newline_pos+1:])
148
class SmartServerStreamMedium(SmartMedium):
78
149
"""Handles smart commands coming over a stream.
80
151
The stream may be a pipe connected to sshd, or a tcp socket, or an
101
172
self.backing_transport = backing_transport
102
173
self.root_client_path = root_client_path
103
174
self.finished = False
104
self._push_back_buffer = None
106
def _push_back(self, bytes):
107
"""Return unused bytes to the medium, because they belong to the next
110
This sets the _push_back_buffer to the given bytes.
112
if self._push_back_buffer is not None:
113
raise AssertionError(
114
"_push_back called when self._push_back_buffer is %r"
115
% (self._push_back_buffer,))
118
self._push_back_buffer = bytes
120
def _get_push_back_buffer(self):
121
if self._push_back_buffer == '':
122
raise AssertionError(
123
'%s._push_back_buffer should never be the empty string, '
124
'which can be confused with EOF' % (self,))
125
bytes = self._push_back_buffer
126
self._push_back_buffer = None
175
SmartMedium.__init__(self)
130
178
"""Serve requests until the client disconnects."""
171
219
"""Called when an unhandled exception from the protocol occurs."""
172
220
raise NotImplementedError(self.terminate_due_to_error)
174
def _get_bytes(self, desired_count):
222
def _read_bytes(self, desired_count):
175
223
"""Get some bytes from the medium.
177
225
:param desired_count: number of bytes we want to read.
179
raise NotImplementedError(self._get_bytes)
182
"""Read bytes from this request's response until a newline byte.
184
This isn't particularly efficient, so should only be used when the
185
expected size of the line is quite short.
187
:returns: a string of bytes ending in a newline (byte 0x0A).
191
while newline_pos == -1:
192
new_bytes = self._get_bytes(1)
195
# Ran out of bytes before receiving a complete line.
197
newline_pos = bytes.find('\n')
198
line = bytes[:newline_pos+1]
199
self._push_back(bytes[newline_pos+1:])
227
raise NotImplementedError(self._read_bytes)
203
230
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
216
243
def _serve_one_request_unguarded(self, protocol):
217
244
while protocol.next_read_size():
218
bytes = self._get_bytes(4096)
245
# We can safely try to read large chunks. If there is less data
246
# than _MAX_READ_SIZE ready, the socket wil just return a short
247
# read immediately rather than block.
248
bytes = self.read_bytes(_MAX_READ_SIZE)
220
250
self.finished = True
224
254
self._push_back(protocol.unused_data)
226
def _get_bytes(self, desired_count):
227
if self._push_back_buffer is not None:
228
return self._get_push_back_buffer()
256
def _read_bytes(self, desired_count):
229
257
# We ignore the desired_count because on sockets it's more efficient to
231
return self.socket.recv(4096)
258
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
259
return self.socket.recv(_MAX_READ_SIZE)
233
261
def terminate_due_to_error(self):
234
262
# TODO: This should log to a server log file, but no such thing
235
263
# exists yet. Andrew Bennetts 2006-09-29.
263
291
def _serve_one_request_unguarded(self, protocol):
293
# We need to be careful not to read past the end of the current
294
# request, or else the read from the pipe will block, so we use
295
# protocol.next_read_size().
265
296
bytes_to_read = protocol.next_read_size()
266
297
if bytes_to_read == 0:
267
298
# Finished serving this request.
268
299
self._out.flush()
270
bytes = self._get_bytes(bytes_to_read)
301
bytes = self.read_bytes(bytes_to_read)
272
303
# Connection has been closed.
273
304
self.finished = True
276
307
protocol.accept_bytes(bytes)
278
def _get_bytes(self, desired_count):
279
if self._push_back_buffer is not None:
280
return self._get_push_back_buffer()
309
def _read_bytes(self, desired_count):
281
310
return self._in.read(desired_count)
283
312
def terminate_due_to_error(self):
397
426
return self._read_bytes(count)
399
428
def _read_bytes(self, count):
400
"""Helper for read_bytes.
429
"""Helper for SmartClientMediumRequest.read_bytes.
402
431
read_bytes checks the state of the request to determing if bytes
403
432
should be read. After that it hands off to _read_bytes to do the
435
By default this forwards to self._medium.read_bytes because we are
436
operating on the medium's stream.
406
raise NotImplementedError(self._read_bytes)
438
return self._medium.read_bytes(count)
408
440
def read_line(self):
409
"""Read bytes from this request's response until a newline byte.
411
This isn't particularly efficient, so should only be used when the
412
expected size of the line is quite short.
414
:returns: a string of bytes ending in a newline (byte 0x0A).
416
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
418
while not line or line[-1] != '\n':
419
new_char = self.read_bytes(1)
422
# end of file encountered reading from server
423
raise errors.ConnectionReset(
424
"please check connectivity and permissions",
425
"(and try -Dhpss if further diagnosis is required)")
441
line = self._medium._get_line()
442
if not line.endswith('\n'):
443
# end of file encountered reading from server
444
raise errors.ConnectionReset(
445
"please check connectivity and permissions",
446
"(and try -Dhpss if further diagnosis is required)")
429
class SmartClientMedium(object):
450
class SmartClientMedium(SmartMedium):
430
451
"""Smart client is a medium for sending smart protocol requests over."""
432
453
def __init__(self, base):
660
678
"""See SmartClientStreamMedium.read_bytes."""
661
679
if not self._connected:
662
680
raise errors.MediumNotConnected(self)
663
return self._read_from.read(count)
681
bytes_to_read = min(count, _MAX_READ_SIZE)
682
return self._read_from.read(bytes_to_read)
666
685
# Port 4155 is the default port for bzr://, registered with IANA.
726
745
"""See SmartClientMedium.read_bytes."""
727
746
if not self._connected:
728
747
raise errors.MediumNotConnected(self)
729
return self._socket.recv(count)
748
# We ignore the desired_count because on sockets it's more efficient to
749
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
750
return self._socket.recv(_MAX_READ_SIZE)
732
753
class SmartClientStreamMediumRequest(SmartClientMediumRequest):