120
105
for start, length in offsets:
121
106
txt.append('%d,%d' % (start, length))
122
107
return '\n'.join(txt)
125
110
class SmartServerRequestProtocolOne(SmartProtocolBase):
126
111
"""Server-side encoding and decoding logic for smart version 1."""
128
def __init__(self, backing_transport, write_func, root_client_path='/',
113
def __init__(self, backing_transport, write_func, root_client_path='/'):
130
114
self._backing_transport = backing_transport
131
115
self._root_client_path = root_client_path
132
self._jail_root = jail_root
133
116
self.unused_data = ''
134
117
self._finished = False
135
118
self.in_buffer = ''
136
self._has_dispatched = False
119
self.has_dispatched = False
137
120
self.request = None
138
121
self._body_decoder = None
139
122
self._write_func = write_func
141
124
def accept_bytes(self, bytes):
142
125
"""Take bytes, and advance the internal state machine appropriately.
144
127
:param bytes: must be a byte string
146
129
if not isinstance(bytes, str):
147
130
raise ValueError(bytes)
148
131
self.in_buffer += bytes
149
if not self._has_dispatched:
132
if not self.has_dispatched:
150
133
if '\n' not in self.in_buffer:
151
134
# no command line yet
153
self._has_dispatched = True
136
self.has_dispatched = True
155
138
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
156
139
first_line += '\n'
157
140
req_args = _decode_tuple(first_line)
158
141
self.request = request.SmartServerRequestHandler(
159
142
self._backing_transport, commands=request.request_handlers,
160
root_client_path=self._root_client_path,
161
jail_root=self._jail_root)
162
self.request.args_received(req_args)
143
root_client_path=self._root_client_path)
144
self.request.dispatch_command(req_args[0], req_args[1:])
163
145
if self.request.finished_reading:
164
146
# trivial request
165
147
self.unused_data = self.in_buffer
318
300
def __init__(self, count=None):
321
:param count: the total number of bytes needed by the current state.
322
May be None if the number of bytes needed is unknown.
324
301
self.count = count
327
304
class _StatefulDecoder(object):
328
"""Base class for writing state machines to decode byte streams.
330
Subclasses should provide a self.state_accept attribute that accepts bytes
331
and, if appropriate, updates self.state_accept to a different function.
332
accept_bytes will call state_accept as often as necessary to make sure the
333
state machine has progressed as far as possible before it returns.
335
See ProtocolThreeDecoder for an example subclass.
338
306
def __init__(self):
339
307
self.finished_reading = False
340
self._in_buffer_list = []
341
self._in_buffer_len = 0
342
308
self.unused_data = ''
343
309
self.bytes_left = None
344
310
self._number_needed_bytes = None
346
def _get_in_buffer(self):
347
if len(self._in_buffer_list) == 1:
348
return self._in_buffer_list[0]
349
in_buffer = ''.join(self._in_buffer_list)
350
if len(in_buffer) != self._in_buffer_len:
351
raise AssertionError(
352
"Length of buffer did not match expected value: %s != %s"
353
% self._in_buffer_len, len(in_buffer))
354
self._in_buffer_list = [in_buffer]
357
def _get_in_bytes(self, count):
358
"""Grab X bytes from the input_buffer.
360
Callers should have already checked that self._in_buffer_len is >
361
count. Note, this does not consume the bytes from the buffer. The
362
caller will still need to call _get_in_buffer() and then
363
_set_in_buffer() if they actually need to consume the bytes.
365
# check if we can yield the bytes from just the first entry in our list
366
if len(self._in_buffer_list) == 0:
367
raise AssertionError('Callers must be sure we have buffered bytes'
368
' before calling _get_in_bytes')
369
if len(self._in_buffer_list[0]) > count:
370
return self._in_buffer_list[0][:count]
371
# We can't yield it from the first buffer, so collapse all buffers, and
373
in_buf = self._get_in_buffer()
374
return in_buf[:count]
376
def _set_in_buffer(self, new_buf):
377
if new_buf is not None:
378
self._in_buffer_list = [new_buf]
379
self._in_buffer_len = len(new_buf)
381
self._in_buffer_list = []
382
self._in_buffer_len = 0
384
312
def accept_bytes(self, bytes):
385
313
"""Decode as much of bytes as possible.
552
469
# Reading excess data. Either way, 1 byte at a time is fine.
555
472
def read_pending_data(self):
556
473
"""Return any pending data that has been decoded."""
557
474
return self.state_read()
559
def _state_accept_expecting_length(self):
560
in_buf = self._get_in_buffer()
561
pos = in_buf.find('\n')
476
def _state_accept_expecting_length(self, bytes):
477
self._in_buffer += bytes
478
pos = self._in_buffer.find('\n')
564
self.bytes_left = int(in_buf[:pos])
565
self._set_in_buffer(in_buf[pos+1:])
481
self.bytes_left = int(self._in_buffer[:pos])
482
self._in_buffer = self._in_buffer[pos+1:]
483
self.bytes_left -= len(self._in_buffer)
566
484
self.state_accept = self._state_accept_reading_body
567
self.state_read = self._state_read_body_buffer
485
self.state_read = self._state_read_in_buffer
569
def _state_accept_reading_body(self):
570
in_buf = self._get_in_buffer()
572
self.bytes_left -= len(in_buf)
573
self._set_in_buffer(None)
487
def _state_accept_reading_body(self, bytes):
488
self._in_buffer += bytes
489
self.bytes_left -= len(bytes)
574
490
if self.bytes_left <= 0:
575
491
# Finished with body
576
492
if self.bytes_left != 0:
577
self._trailer_buffer = self._body[self.bytes_left:]
578
self._body = self._body[:self.bytes_left]
493
self._trailer_buffer = self._in_buffer[self.bytes_left:]
494
self._in_buffer = self._in_buffer[:self.bytes_left]
579
495
self.bytes_left = None
580
496
self.state_accept = self._state_accept_reading_trailer
582
def _state_accept_reading_trailer(self):
583
self._trailer_buffer += self._get_in_buffer()
584
self._set_in_buffer(None)
498
def _state_accept_reading_trailer(self, bytes):
499
self._trailer_buffer += bytes
585
500
# TODO: what if the trailer does not match "done\n"? Should this raise
586
501
# a ProtocolViolation exception?
587
502
if self._trailer_buffer.startswith('done\n'):
588
503
self.unused_data = self._trailer_buffer[len('done\n'):]
589
504
self.state_accept = self._state_accept_reading_unused
590
505
self.finished_reading = True
592
def _state_accept_reading_unused(self):
593
self.unused_data += self._get_in_buffer()
594
self._set_in_buffer(None)
507
def _state_accept_reading_unused(self, bytes):
508
self.unused_data += bytes
596
510
def _state_read_no_data(self):
599
def _state_read_body_buffer(self):
513
def _state_read_in_buffer(self):
514
result = self._in_buffer
605
519
class SmartClientRequestProtocolOne(SmartProtocolBase, Requester,
606
message.ResponseHandler):
520
message.ResponseHandler):
607
521
"""The client-side protocol for smart version 1."""
609
523
def __init__(self, request):
756
653
# The response will have no body, so we've finished reading.
757
654
self._request.finished_reading()
758
655
raise errors.UnknownSmartMethod(self._last_verb)
760
657
def read_body_bytes(self, count=-1):
761
658
"""Read bytes from the body, decoding into a byte stream.
763
We read all bytes at once to ensure we've checked the trailer for
660
We read all bytes at once to ensure we've checked the trailer for
764
661
errors, and then feed the buffer back as read_body_bytes is called.
766
663
if self._body_buffer is not None:
767
664
return self._body_buffer.read(count)
768
665
_body_decoder = LengthPrefixedBodyDecoder()
667
# Read no more than 64k at a time so that we don't risk error 10055 (no
668
# buffer space available) on Windows.
770
670
while not _body_decoder.finished_reading:
771
bytes = self._request.read_bytes(_body_decoder.next_read_size())
773
# end of file encountered reading from server
774
raise errors.ConnectionReset(
775
"Connection lost while reading response body.")
671
bytes_wanted = min(_body_decoder.next_read_size(), max_read)
672
bytes = self._request.read_bytes(bytes_wanted)
776
673
_body_decoder.accept_bytes(bytes)
777
674
self._request.finished_reading()
778
675
self._body_buffer = StringIO(_body_decoder.read_pending_data())
907
812
_StatefulDecoder.accept_bytes(self, bytes)
908
813
except KeyboardInterrupt:
910
except errors.SmartMessageHandlerError, exception:
911
# We do *not* set self.decoding_failed here. The message handler
912
# has raised an error, but the decoder is still able to parse bytes
913
# and determine when this message ends.
914
if not isinstance(exception.exc_value, errors.UnknownSmartMethod):
915
log_exception_quietly()
916
self.message_handler.protocol_error(exception.exc_value)
917
# The state machine is ready to continue decoding, but the
918
# exception has interrupted the loop that runs the state machine.
919
# So we call accept_bytes again to restart it.
920
self.accept_bytes('')
921
815
except Exception, exception:
922
# The decoder itself has raised an exception. We cannot continue
924
self.decoding_failed = True
925
if isinstance(exception, errors.UnexpectedProtocolVersionMarker):
926
# This happens during normal operation when the client tries a
927
# protocol version the server doesn't understand, so no need to
928
# log a traceback every time.
929
# Note that this can only happen when
930
# expect_version_marker=True, which is only the case on the
934
log_exception_quietly()
816
log_exception_quietly()
935
817
self.message_handler.protocol_error(exception)
937
820
def _extract_length_prefixed_bytes(self):
938
if self._in_buffer_len < 4:
821
if len(self._in_buffer) < 4:
939
822
# A length prefix by itself is 4 bytes, and we don't even have that
941
824
raise _NeedMoreBytes(4)
942
(length,) = struct.unpack('!L', self._get_in_bytes(4))
825
(length,) = struct.unpack('!L', self._in_buffer[:4])
943
826
end_of_bytes = 4 + length
944
if self._in_buffer_len < end_of_bytes:
827
if len(self._in_buffer) < end_of_bytes:
945
828
# We haven't yet read as many bytes as the length-prefix says there
947
830
raise _NeedMoreBytes(end_of_bytes)
948
831
# Extract the bytes from the buffer.
949
in_buf = self._get_in_buffer()
950
bytes = in_buf[4:end_of_bytes]
951
self._set_in_buffer(in_buf[end_of_bytes:])
832
bytes = self._in_buffer[4:end_of_bytes]
833
self._in_buffer = self._in_buffer[end_of_bytes:]
954
836
def _extract_prefixed_bencoded_data(self):
955
837
prefixed_bytes = self._extract_length_prefixed_bytes()
957
decoded = bdecode_as_tuple(prefixed_bytes)
839
decoded = bdecode(prefixed_bytes)
958
840
except ValueError:
959
841
raise errors.SmartProtocolError(
960
842
'Bytes %r not bencoded' % (prefixed_bytes,))
963
845
def _extract_single_byte(self):
964
if self._in_buffer_len == 0:
846
if self._in_buffer == '':
965
847
# The buffer is empty
966
848
raise _NeedMoreBytes(1)
967
in_buf = self._get_in_buffer()
969
self._set_in_buffer(in_buf[1:])
849
one_byte = self._in_buffer[0]
850
self._in_buffer = self._in_buffer[1:]
972
def _state_accept_expecting_protocol_version(self):
973
needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
974
in_buf = self._get_in_buffer()
976
# We don't have enough bytes to check if the protocol version
977
# marker is right. But we can check if it is already wrong by
978
# checking that the start of MESSAGE_VERSION_THREE matches what
980
# [In fact, if the remote end isn't bzr we might never receive
981
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
982
# are wrong then we should just raise immediately rather than
984
if not MESSAGE_VERSION_THREE.startswith(in_buf):
985
# We have enough bytes to know the protocol version is wrong
986
raise errors.UnexpectedProtocolVersionMarker(in_buf)
987
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
988
if not in_buf.startswith(MESSAGE_VERSION_THREE):
989
raise errors.UnexpectedProtocolVersionMarker(in_buf)
990
self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
991
self.state_accept = self._state_accept_expecting_headers
993
def _state_accept_expecting_headers(self):
853
def _state_accept_expecting_headers(self, bytes):
854
self._in_buffer += bytes
994
855
decoded = self._extract_prefixed_bencoded_data()
995
856
if type(decoded) is not dict:
996
857
raise errors.SmartProtocolError(
997
858
'Header object %r is not a dict' % (decoded,))
859
self.message_handler.headers_received(decoded)
998
860
self.state_accept = self._state_accept_expecting_message_part
1000
self.message_handler.headers_received(decoded)
1002
raise errors.SmartMessageHandlerError(sys.exc_info())
1004
def _state_accept_expecting_message_part(self):
862
def _state_accept_expecting_message_part(self, bytes):
863
self._in_buffer += bytes
1005
864
message_part_kind = self._extract_single_byte()
1006
865
if message_part_kind == 'o':
1007
866
self.state_accept = self._state_accept_expecting_one_byte
1015
874
raise errors.SmartProtocolError(
1016
875
'Bad message kind byte: %r' % (message_part_kind,))
1018
def _state_accept_expecting_one_byte(self):
877
def _state_accept_expecting_one_byte(self, bytes):
878
self._in_buffer += bytes
1019
879
byte = self._extract_single_byte()
880
self.message_handler.byte_part_received(byte)
1020
881
self.state_accept = self._state_accept_expecting_message_part
1022
self.message_handler.byte_part_received(byte)
1024
raise errors.SmartMessageHandlerError(sys.exc_info())
1026
def _state_accept_expecting_bytes(self):
883
def _state_accept_expecting_bytes(self, bytes):
1027
884
# XXX: this should not buffer whole message part, but instead deliver
1028
885
# the bytes as they arrive.
886
self._in_buffer += bytes
1029
887
prefixed_bytes = self._extract_length_prefixed_bytes()
888
self.message_handler.bytes_part_received(prefixed_bytes)
1030
889
self.state_accept = self._state_accept_expecting_message_part
1032
self.message_handler.bytes_part_received(prefixed_bytes)
1034
raise errors.SmartMessageHandlerError(sys.exc_info())
1036
def _state_accept_expecting_structure(self):
891
def _state_accept_expecting_structure(self, bytes):
892
self._in_buffer += bytes
1037
893
structure = self._extract_prefixed_bencoded_data()
894
self.message_handler.structure_part_received(structure)
1038
895
self.state_accept = self._state_accept_expecting_message_part
1040
self.message_handler.structure_part_received(structure)
1042
raise errors.SmartMessageHandlerError(sys.exc_info())
1045
self.unused_data = self._get_in_buffer()
1046
self._set_in_buffer(None)
898
self.unused_data = self._in_buffer
899
self._in_buffer = None
1047
900
self.state_accept = self._state_accept_reading_unused
1049
self.message_handler.end_received()
1051
raise errors.SmartMessageHandlerError(sys.exc_info())
901
self.message_handler.end_received()
1053
def _state_accept_reading_unused(self):
1054
self.unused_data += self._get_in_buffer()
1055
self._set_in_buffer(None)
903
def _state_accept_reading_unused(self, bytes):
904
self.unused_data += bytes
1057
906
def next_read_size(self):
1058
907
if self.state_accept == self._state_accept_reading_unused:
1060
elif self.decoding_failed:
1061
910
# An exception occured while processing this message, probably from
1062
911
# self.message_handler. We're not sure that this state machine is
1063
912
# in a consistent state, so just signal that we're done (i.e. give
1073
922
class _ProtocolThreeEncoder(object):
1075
924
response_marker = request_marker = MESSAGE_VERSION_THREE
1076
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1078
926
def __init__(self, write_func):
1081
self._real_write_func = write_func
1083
def _write_func(self, bytes):
1084
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1085
# for total number of bytes to write, rather than buffer based on
1086
# the number of write() calls
1087
# TODO: Another possibility would be to turn this into an async model.
1088
# Where we let another thread know that we have some bytes if
1089
# they want it, but we don't actually block for it
1090
# Note that osutils.send_all always sends 64kB chunks anyway, so
1091
# we might just push out smaller bits at a time?
1092
self._buf.append(bytes)
1093
self._buf_len += len(bytes)
1094
if self._buf_len > self.BUFFER_SIZE:
1099
self._real_write_func(''.join(self._buf))
927
self._write_func = write_func
1103
929
def _serialise_offsets(self, offsets):
1104
930
"""Serialise a readv offset list."""
1152
976
def __init__(self, write_func):
1153
977
_ProtocolThreeEncoder.__init__(self, write_func)
1154
978
self.response_sent = False
1155
self._headers = {'Software version': bzrlib.__version__}
1156
if 'hpss' in debug.debug_flags:
1157
self._thread_id = thread.get_ident()
1158
self._response_start_time = None
1160
def _trace(self, action, message, extra_bytes=None, include_time=False):
1161
if self._response_start_time is None:
1162
self._response_start_time = osutils.timer_func()
1164
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1167
if extra_bytes is None:
1170
extra = ' ' + repr(extra_bytes[:40])
1172
extra = extra[:29] + extra[-1] + '...'
1173
mutter('%12s: [%s] %s%s%s'
1174
% (action, self._thread_id, t, message, extra))
1176
980
def send_error(self, exception):
1177
if self.response_sent:
1178
raise AssertionError(
1179
"send_error(%s) called, but response already sent."
981
assert not self.response_sent
1181
982
if isinstance(exception, errors.UnknownSmartMethod):
1182
983
failure = request.FailedSmartServerResponse(
1183
984
('UnknownMethod', exception.verb))
1184
985
self.send_response(failure)
1186
if 'hpss' in debug.debug_flags:
1187
self._trace('error', str(exception))
1188
987
self.response_sent = True
1189
self._write_protocol_version()
1190
self._write_headers(self._headers)
988
self._write_headers()
1191
989
self._write_error_status()
1192
990
self._write_structure(('error', str(exception)))
1193
991
self._write_end()
1195
993
def send_response(self, response):
1196
if self.response_sent:
1197
raise AssertionError(
1198
"send_response(%r) called, but response already sent."
994
assert not self.response_sent
1200
995
self.response_sent = True
1201
self._write_protocol_version()
1202
self._write_headers(self._headers)
996
self._write_headers()
1203
997
if response.is_successful():
1204
998
self._write_success_status()
1206
1000
self._write_error_status()
1207
if 'hpss' in debug.debug_flags:
1208
self._trace('response', repr(response.args))
1209
1001
self._write_structure(response.args)
1210
1002
if response.body is not None:
1211
1003
self._write_prefixed_body(response.body)
1212
if 'hpss' in debug.debug_flags:
1213
self._trace('body', '%d bytes' % (len(response.body),),
1214
response.body, include_time=True)
1215
1004
elif response.body_stream is not None:
1216
count = num_bytes = 0
1218
for exc_info, chunk in _iter_with_errors(response.body_stream):
1220
if exc_info is not None:
1221
self._write_error_status()
1222
error_struct = request._translate_error(exc_info[1])
1223
self._write_structure(error_struct)
1226
if isinstance(chunk, request.FailedSmartServerResponse):
1227
self._write_error_status()
1228
self._write_structure(chunk.args)
1230
num_bytes += len(chunk)
1231
if first_chunk is None:
1233
self._write_prefixed_body(chunk)
1234
if 'hpssdetail' in debug.debug_flags:
1235
# Not worth timing separately, as _write_func is
1237
self._trace('body chunk',
1238
'%d bytes' % (len(chunk),),
1239
chunk, suppress_time=True)
1240
if 'hpss' in debug.debug_flags:
1241
self._trace('body stream',
1242
'%d bytes %d chunks' % (num_bytes, count),
1005
for chunk in response.body_stream:
1006
self._write_prefixed_body(chunk)
1244
1007
self._write_end()
1245
if 'hpss' in debug.debug_flags:
1246
self._trace('response end', '', include_time=True)
1249
def _iter_with_errors(iterable):
1250
"""Handle errors from iterable.next().
1254
for exc_info, value in _iter_with_errors(iterable):
1257
This is a safer alternative to::
1260
for value in iterable:
1265
Because the latter will catch errors from the for-loop body, not just
1268
If an error occurs, exc_info will be a exc_info tuple, and the generator
1269
will terminate. Otherwise exc_info will be None, and value will be the
1270
value from iterable.next(). Note that KeyboardInterrupt and SystemExit
1271
will not be itercepted.
1273
iterator = iter(iterable)
1276
yield None, iterator.next()
1277
except StopIteration:
1279
except (KeyboardInterrupt, SystemExit):
1282
mutter('_iter_with_errors caught error')
1283
log_exception_quietly()
1284
yield sys.exc_info(), None
1288
1010
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1290
1012
def __init__(self, medium_request):
1291
1013
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1292
1014
self._medium_request = medium_request
1295
def set_headers(self, headers):
1296
self._headers = headers.copy()
1298
def call(self, *args):
1016
def call(self, *args, **kw):
1017
# XXX: ideally, signature would be call(self, *args, headers=None), but
1018
# python doesn't allow that. So, we fake it.
1021
headers = kw.pop('headers')
1023
raise TypeError('Unexpected keyword arguments: %r' % (kw,))
1299
1024
if 'hpss' in debug.debug_flags:
1300
1025
mutter('hpss call: %s', repr(args)[1:-1])
1301
1026
base = getattr(self._medium_request._medium, 'base', None)
1302
1027
if base is not None:
1303
1028
mutter(' (to %s)', base)
1304
self._request_start_time = osutils.timer_func()
1029
self._request_start_time = time.time()
1305
1030
self._write_protocol_version()
1306
self._write_headers(self._headers)
1031
self._write_headers(headers)
1307
1032
self._write_structure(args)
1308
1033
self._write_end()
1309
1034
self._medium_request.finished_writing()
1311
def call_with_body_bytes(self, args, body):
1036
def call_with_body_bytes(self, args, body, headers=None):
1312
1037
"""Make a remote call of args with body bytes 'body'.
1314
1039
After calling this, call read_response_tuple to find the result out.
1349
1074
self._write_end()
1350
1075
self._medium_request.finished_writing()
1352
def call_with_body_stream(self, args, stream):
1353
if 'hpss' in debug.debug_flags:
1354
mutter('hpss call w/body stream: %r', args)
1355
path = getattr(self._medium_request._medium, '_path', None)
1356
if path is not None:
1357
mutter(' (to %s)', path)
1358
self._request_start_time = osutils.timer_func()
1359
self._write_protocol_version()
1360
self._write_headers(self._headers)
1361
self._write_structure(args)
1362
# TODO: notice if the server has sent an early error reply before we
1363
# have finished sending the stream. We would notice at the end
1364
# anyway, but if the medium can deliver it early then it's good
1365
# to short-circuit the whole request...
1366
for exc_info, part in _iter_with_errors(stream):
1367
if exc_info is not None:
1368
# Iterating the stream failed. Cleanly abort the request.
1369
self._write_error_status()
1370
# Currently the client unconditionally sends ('error',) as the
1372
self._write_structure(('error',))
1374
self._medium_request.finished_writing()
1375
raise exc_info[0], exc_info[1], exc_info[2]
1377
self._write_prefixed_body(part)
1380
self._medium_request.finished_writing()