29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
from bzrlib.util.bencode import bdecode, bencode
32
from bzrlib.util.bencode import bdecode_as_tuple, bencode
35
35
# Protocol version strings. These are sent as prefixes of bzr requests and
109
109
for start, length in offsets:
110
110
txt.append('%d,%d' % (start, length))
111
111
return '\n'.join(txt)
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
"""Server-side encoding and decoding logic for smart version 1."""
117
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
self._backing_transport = backing_transport
119
119
self._root_client_path = root_client_path
128
128
def accept_bytes(self, bytes):
129
129
"""Take bytes, and advance the internal state machine appropriately.
131
131
:param bytes: must be a byte string
133
133
if not isinstance(bytes, str):
412
412
self.chunks = collections.deque()
413
413
self.error = False
414
414
self.error_in_progress = None
416
416
def next_read_size(self):
417
417
# Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
# end-of-body marker is 4 bytes: 'END\n'.
506
506
self.chunks.append(self.chunk_in_progress)
507
507
self.chunk_in_progress = None
508
508
self.state_accept = self._state_accept_expecting_length
510
510
def _state_accept_reading_unused(self):
511
511
self.unused_data += self._get_in_buffer()
512
512
self._in_buffer_list = []
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
"""Decodes the length-prefixed bulk data."""
518
518
def __init__(self):
519
519
_StatefulDecoder.__init__(self)
520
520
self.state_accept = self._state_accept_expecting_length
521
521
self.state_read = self._state_read_no_data
523
523
self._trailer_buffer = ''
525
525
def next_read_size(self):
526
526
if self.bytes_left is not None:
527
527
# Ideally we want to read all the remainder of the body and the
564
564
self._body = self._body[:self.bytes_left]
565
565
self.bytes_left = None
566
566
self.state_accept = self._state_accept_reading_trailer
568
568
def _state_accept_reading_trailer(self):
569
569
self._trailer_buffer += self._get_in_buffer()
570
570
self._set_in_buffer(None)
574
574
self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
self.state_accept = self._state_accept_reading_unused
576
576
self.finished_reading = True
578
578
def _state_accept_reading_unused(self):
579
579
self.unused_data += self._get_in_buffer()
580
580
self._set_in_buffer(None)
656
656
mutter(' %d bytes in readv request', len(readv_bytes))
657
657
self._last_verb = args[0]
659
def call_with_body_stream(self, args, stream):
660
# Protocols v1 and v2 don't support body streams. So it's safe to
661
# assume that a v1/v2 server doesn't support whatever method we're
662
# trying to call with a body stream.
663
self._request.finished_writing()
664
self._request.finished_reading()
665
raise errors.UnknownSmartMethod(args[0])
659
667
def cancel_read_body(self):
660
668
"""After expecting a body, a response code may indicate one otherwise.
721
729
def _response_is_unknown_method(self, result_tuple):
722
730
"""Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
723
731
method' response to the request.
725
733
:param response: The response from a smart client call_expecting_body
727
735
:param verb: The verb used in that call.
734
742
# The response will have no body, so we've finished reading.
735
743
self._request.finished_reading()
736
744
raise errors.UnknownSmartMethod(self._last_verb)
738
746
def read_body_bytes(self, count=-1):
739
747
"""Read bytes from the body, decoding into a byte stream.
741
We read all bytes at once to ensure we've checked the trailer for
749
We read all bytes at once to ensure we've checked the trailer for
742
750
errors, and then feed the buffer back as read_body_bytes is called.
744
752
if self._body_buffer is not None:
783
791
def _write_protocol_version(self):
784
792
"""Write any prefixes this protocol requires.
786
794
Version one doesn't send protocol versions.
790
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
791
799
"""Version two of the client side of the smart protocol.
793
801
This prefixes the request with the value of REQUEST_VERSION_TWO.
931
939
def _extract_prefixed_bencoded_data(self):
932
940
prefixed_bytes = self._extract_length_prefixed_bytes()
934
decoded = bdecode(prefixed_bytes)
942
decoded = bdecode_as_tuple(prefixed_bytes)
935
943
except ValueError:
936
944
raise errors.SmartProtocolError(
937
945
'Bytes %r not bencoded' % (prefixed_bytes,))
977
985
self.message_handler.headers_received(decoded)
979
987
raise errors.SmartMessageHandlerError(sys.exc_info())
981
989
def _state_accept_expecting_message_part(self):
982
990
message_part_kind = self._extract_single_byte()
983
991
if message_part_kind == 'o':
1069
1077
for start, length in offsets:
1070
1078
txt.append('%d,%d' % (start, length))
1071
1079
return '\n'.join(txt)
1073
1081
def _write_protocol_version(self):
1074
1082
self._write_func(MESSAGE_VERSION_THREE)
1163
1174
def set_headers(self, headers):
1164
1175
self._headers = headers.copy()
1166
1177
def call(self, *args):
1167
1178
if 'hpss' in debug.debug_flags:
1168
1179
mutter('hpss call: %s', repr(args)[1:-1])
1217
1228
self._write_end()
1218
1229
self._medium_request.finished_writing()
1231
def call_with_body_stream(self, args, stream):
1232
if 'hpss' in debug.debug_flags:
1233
mutter('hpss call w/body stream: %r', args)
1234
path = getattr(self._medium_request._medium, '_path', None)
1235
if path is not None:
1236
mutter(' (to %s)', path)
1237
self._request_start_time = time.time()
1238
self._write_protocol_version()
1239
self._write_headers(self._headers)
1240
self._write_structure(args)
1241
# TODO: notice if the server has sent an early error reply before we
1242
# have finished sending the stream. We would notice at the end
1243
# anyway, but if the medium can deliver it early then it's good
1244
# to short-circuit the whole request...
1247
self._write_prefixed_body(part)
1250
exc_info = sys.exc_info()
1251
# Iterating the stream failed. Cleanly abort the request.
1252
self._write_error_status()
1253
# Currently the client unconditionally sends ('error',) as the
1255
self._write_structure(('error',))
1257
self._medium_request.finished_writing()
1258
raise exc_info[0], exc_info[1], exc_info[2]
1260
self._medium_request.finished_writing()