109
109
for start, length in offsets:
110
110
txt.append('%d,%d' % (start, length))
111
111
return '\n'.join(txt)
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
"""Server-side encoding and decoding logic for smart version 1."""
117
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
self._backing_transport = backing_transport
119
119
self._root_client_path = root_client_path
128
128
def accept_bytes(self, bytes):
129
129
"""Take bytes, and advance the internal state machine appropriately.
131
131
:param bytes: must be a byte string
133
133
if not isinstance(bytes, str):
412
412
self.chunks = collections.deque()
413
413
self.error = False
414
414
self.error_in_progress = None
416
416
def next_read_size(self):
417
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
# end-of-body marker is 4 bytes: 'END\n'.
506
506
self.chunks.append(self.chunk_in_progress)
507
507
self.chunk_in_progress = None
508
508
self.state_accept = self._state_accept_expecting_length
510
510
def _state_accept_reading_unused(self):
511
511
self.unused_data += self._get_in_buffer()
512
512
self._in_buffer_list = []
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
"""Decodes the length-prefixed bulk data."""
518
518
def __init__(self):
519
519
_StatefulDecoder.__init__(self)
520
520
self.state_accept = self._state_accept_expecting_length
521
521
self.state_read = self._state_read_no_data
523
523
self._trailer_buffer = ''
525
525
def next_read_size(self):
526
526
if self.bytes_left is not None:
527
527
# Ideally we want to read all the remainder of the body and the
564
564
self._body = self._body[:self.bytes_left]
565
565
self.bytes_left = None
566
566
self.state_accept = self._state_accept_reading_trailer
568
568
def _state_accept_reading_trailer(self):
569
569
self._trailer_buffer += self._get_in_buffer()
570
570
self._set_in_buffer(None)
574
574
self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
self.state_accept = self._state_accept_reading_unused
576
576
self.finished_reading = True
578
578
def _state_accept_reading_unused(self):
579
579
self.unused_data += self._get_in_buffer()
580
580
self._set_in_buffer(None)
655
655
if 'hpss' in debug.debug_flags:
656
656
mutter(' %d bytes in readv request', len(readv_bytes))
657
657
self._last_verb = args[0]
659
659
def call_with_body_stream(self, args, stream):
660
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
661
# assume that a v1/v2 server doesn't support whatever method we're
729
729
def _response_is_unknown_method(self, result_tuple):
730
730
"""Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
731
731
method' response to the request.
733
733
:param response: The response from a smart client call_expecting_body
735
735
:param verb: The verb used in that call.
742
742
# The response will have no body, so we've finished reading.
743
743
self._request.finished_reading()
744
744
raise errors.UnknownSmartMethod(self._last_verb)
746
746
def read_body_bytes(self, count=-1):
747
747
"""Read bytes from the body, decoding into a byte stream.
749
We read all bytes at once to ensure we've checked the trailer for
749
We read all bytes at once to ensure we've checked the trailer for
750
750
errors, and then feed the buffer back as read_body_bytes is called.
752
752
if self._body_buffer is not None:
791
791
def _write_protocol_version(self):
792
792
"""Write any prefixes this protocol requires.
794
794
Version one doesn't send protocol versions.
798
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
799
799
"""Version two of the client side of the smart protocol.
801
801
This prefixes the request with the value of REQUEST_VERSION_TWO.
985
985
self.message_handler.headers_received(decoded)
987
987
raise errors.SmartMessageHandlerError(sys.exc_info())
989
989
def _state_accept_expecting_message_part(self):
990
990
message_part_kind = self._extract_single_byte()
991
991
if message_part_kind == 'o':
1077
1077
for start, length in offsets:
1078
1078
txt.append('%d,%d' % (start, length))
1079
1079
return '\n'.join(txt)
1081
1081
def _write_protocol_version(self):
1082
1082
self._write_func(MESSAGE_VERSION_THREE)
1158
1158
if response.body is not None:
1159
1159
self._write_prefixed_body(response.body)
1160
1160
elif response.body_stream is not None:
1161
for chunk in response.body_stream:
1162
self._write_prefixed_body(chunk)
1161
for exc_info, chunk in _iter_with_errors(response.body_stream):
1162
if exc_info is not None:
1163
self._write_error_status()
1164
error_struct = request._translate_error(exc_info[1])
1165
self._write_structure(error_struct)
1168
self._write_prefixed_body(chunk)
1164
1170
self._write_end()
1173
def _iter_with_errors(iterable):
1174
"""Handle errors from iterable.next().
1178
for exc_info, value in _iter_with_errors(iterable):
1181
This is a safer alternative to::
1184
for value in iterable:
1189
Because the latter will catch errors from the for-loop body, not just
1192
If an error occurs, exc_info will be a exc_info tuple, and the generator
1193
will terminate. Otherwise exc_info will be None, and value will be the
1194
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1195
will not be itercepted.
1197
iterator = iter(iterable)
1200
yield None, iterator.next()
1201
except StopIteration:
1203
except (KeyboardInterrupt, SystemExit):
1206
yield sys.exc_info(), None
1167
1210
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1174
1217
def set_headers(self, headers):
1175
1218
self._headers = headers.copy()
1177
1220
def call(self, *args):
1178
1221
if 'hpss' in debug.debug_flags:
1179
1222
mutter('hpss call: %s', repr(args)[1:-1])
1242
1285
# have finished sending the stream. We would notice at the end
1243
1286
# anyway, but if the medium can deliver it early then it's good
1244
1287
# to short-circuit the whole request...
1288
for exc_info, part in _iter_with_errors(stream):
1289
if exc_info is not None:
1290
# Iterating the stream failed. Cleanly abort the request.
1291
self._write_error_status()
1292
# Currently the client unconditionally sends ('error',) as the
1294
self._write_structure(('error',))
1296
self._medium_request.finished_writing()
1297
raise exc_info[0], exc_info[1], exc_info[2]
1247
1299
self._write_prefixed_body(part)
1250
# Iterating the stream failed. Cleanly abort the request.
1251
self._write_error_status()
1252
# Currently the client unconditionally sends ('error',) as the
1254
self._write_structure(('error',))
1255
1301
self._write_end()
1256
1302
self._medium_request.finished_writing()