227
def _recv_tuple(from_file):
228
req_line = from_file.readline()
229
return _decode_tuple(req_line)
232
def _decode_tuple(req_line):
233
if req_line == None or req_line == '':
235
if req_line[-1] != '\n':
236
raise errors.SmartProtocolError("request %r not terminated" % req_line)
237
return tuple(req_line[:-1].split('\x01'))
240
def _encode_tuple(args):
241
"""Encode the tuple args to a bytestream."""
242
return '\x01'.join(args) + '\n'
245
class SmartProtocolBase(object):
246
"""Methods common to client and server"""
248
# TODO: this only actually accomodates a single block; possibly should
249
# support multiple chunks?
250
def _encode_bulk_data(self, body):
251
"""Encode body as a bulk data chunk."""
252
return ''.join(('%d\n' % len(body), body, 'done\n'))
254
def _serialise_offsets(self, offsets):
255
"""Serialise a readv offset list."""
257
for start, length in offsets:
258
txt.append('%d,%d' % (start, length))
259
return '\n'.join(txt)
262
class SmartServerRequestProtocolOne(SmartProtocolBase):
263
"""Server-side encoding and decoding logic for smart version 1."""
265
def __init__(self, backing_transport, write_func):
266
self._backing_transport = backing_transport
267
self.excess_buffer = ''
268
self._finished = False
270
self.has_dispatched = False
272
self._body_decoder = None
273
self._write_func = write_func
275
def accept_bytes(self, bytes):
276
"""Take bytes, and advance the internal state machine appropriately.
278
:param bytes: must be a byte string
280
assert isinstance(bytes, str)
281
self.in_buffer += bytes
282
if not self.has_dispatched:
283
if '\n' not in self.in_buffer:
284
# no command line yet
286
self.has_dispatched = True
288
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
290
req_args = _decode_tuple(first_line)
291
self.request = SmartServerRequestHandler(
292
self._backing_transport)
293
self.request.dispatch_command(req_args[0], req_args[1:])
294
if self.request.finished_reading:
296
self.excess_buffer = self.in_buffer
298
self._send_response(self.request.response.args,
299
self.request.response.body)
300
except KeyboardInterrupt:
302
except Exception, exception:
303
# everything else: pass to client, flush, and quit
304
self._send_response(('error', str(exception)))
307
if self.has_dispatched:
309
# nothing to do.XXX: this routine should be a single state
311
self.excess_buffer += self.in_buffer
314
if self._body_decoder is None:
315
self._body_decoder = LengthPrefixedBodyDecoder()
316
self._body_decoder.accept_bytes(self.in_buffer)
317
self.in_buffer = self._body_decoder.unused_data
318
body_data = self._body_decoder.read_pending_data()
319
self.request.accept_body(body_data)
320
if self._body_decoder.finished_reading:
321
self.request.end_of_body()
322
assert self.request.finished_reading, \
323
"no more body, request not finished"
324
if self.request.response is not None:
325
self._send_response(self.request.response.args,
326
self.request.response.body)
327
self.excess_buffer = self.in_buffer
330
assert not self.request.finished_reading, \
331
"no response and we have finished reading."
333
def _send_response(self, args, body=None):
334
"""Send a smart server response down the output stream."""
335
assert not self._finished, 'response already sent'
336
self._finished = True
337
self._write_func(_encode_tuple(args))
339
assert isinstance(body, str), 'body must be a str'
340
bytes = self._encode_bulk_data(body)
341
self._write_func(bytes)
343
def next_read_size(self):
346
if self._body_decoder is None:
349
return self._body_decoder.next_read_size()
352
class LengthPrefixedBodyDecoder(object):
353
"""Decodes the length-prefixed bulk data."""
356
self.bytes_left = None
357
self.finished_reading = False
358
self.unused_data = ''
359
self.state_accept = self._state_accept_expecting_length
360
self.state_read = self._state_read_no_data
362
self._trailer_buffer = ''
364
def accept_bytes(self, bytes):
365
"""Decode as much of bytes as possible.
367
If 'bytes' contains too much data it will be appended to
370
finished_reading will be set when no more data is required. Further
371
data will be appended to self.unused_data.
373
# accept_bytes is allowed to change the state
374
current_state = self.state_accept
375
self.state_accept(bytes)
376
while current_state != self.state_accept:
377
current_state = self.state_accept
378
self.state_accept('')
380
def next_read_size(self):
381
if self.bytes_left is not None:
382
# Ideally we want to read all the remainder of the body and the
384
return self.bytes_left + 5
385
elif self.state_accept == self._state_accept_reading_trailer:
386
# Just the trailer left
387
return 5 - len(self._trailer_buffer)
388
elif self.state_accept == self._state_accept_expecting_length:
389
# There's still at least 6 bytes left ('\n' to end the length, plus
393
# Reading excess data. Either way, 1 byte at a time is fine.
396
def read_pending_data(self):
397
"""Return any pending data that has been decoded."""
398
return self.state_read()
400
def _state_accept_expecting_length(self, bytes):
401
self._in_buffer += bytes
402
pos = self._in_buffer.find('\n')
405
self.bytes_left = int(self._in_buffer[:pos])
406
self._in_buffer = self._in_buffer[pos+1:]
407
self.bytes_left -= len(self._in_buffer)
408
self.state_accept = self._state_accept_reading_body
409
self.state_read = self._state_read_in_buffer
411
def _state_accept_reading_body(self, bytes):
412
self._in_buffer += bytes
413
self.bytes_left -= len(bytes)
414
if self.bytes_left <= 0:
416
if self.bytes_left != 0:
417
self._trailer_buffer = self._in_buffer[self.bytes_left:]
418
self._in_buffer = self._in_buffer[:self.bytes_left]
419
self.bytes_left = None
420
self.state_accept = self._state_accept_reading_trailer
422
def _state_accept_reading_trailer(self, bytes):
423
self._trailer_buffer += bytes
424
# TODO: what if the trailer does not match "done\n"? Should this raise
425
# a ProtocolViolation exception?
426
if self._trailer_buffer.startswith('done\n'):
427
self.unused_data = self._trailer_buffer[len('done\n'):]
428
self.state_accept = self._state_accept_reading_unused
429
self.finished_reading = True
431
def _state_accept_reading_unused(self, bytes):
432
self.unused_data += bytes
434
def _state_read_no_data(self):
437
def _state_read_in_buffer(self):
438
result = self._in_buffer
443
class SmartServerResponse(object):
444
"""Response generated by SmartServerRequestHandler."""
446
def __init__(self, args, body=None):
450
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
451
# for delivering the data for a request. This could be done with as the
452
# StreamServer, though that would create conflation between request and response
453
# which may be undesirable.
456
229
class SmartServerRequestHandler(object):
457
230
"""Protocol logic for smart server.
644
417
if not self._converted_command:
645
418
self.finished_reading = True
646
419
if result is None:
647
self.response = SmartServerResponse(('ok',))
420
self.response = protocol.SmartServerResponse(('ok',))
649
422
def _call_converting_errors(self, callable, args, kwargs):
650
423
"""Call callable converting errors to Response objects."""
652
425
return callable(*args, **kwargs)
653
426
except errors.NoSuchFile, e:
654
return SmartServerResponse(('NoSuchFile', e.path))
427
return protocol.SmartServerResponse(('NoSuchFile', e.path))
655
428
except errors.FileExists, e:
656
return SmartServerResponse(('FileExists', e.path))
429
return protocol.SmartServerResponse(('FileExists', e.path))
657
430
except errors.DirectoryNotEmpty, e:
658
return SmartServerResponse(('DirectoryNotEmpty', e.path))
431
return protocol.SmartServerResponse(('DirectoryNotEmpty', e.path))
659
432
except errors.ShortReadvError, e:
660
return SmartServerResponse(('ShortReadvError',
433
return protocol.SmartServerResponse(('ShortReadvError',
661
434
e.path, str(e.offset), str(e.length), str(e.actual)))
662
435
except UnicodeError, e:
663
436
# If it is a DecodeError, than most likely we are starting
1141
918
self._translate_error(resp)
1144
class SmartClientRequestProtocolOne(SmartProtocolBase):
1145
"""The client-side protocol for smart version 1."""
1147
def __init__(self, request):
1148
"""Construct a SmartClientRequestProtocolOne.
1150
:param request: A SmartClientMediumRequest to serialise onto and
1153
self._request = request
1154
self._body_buffer = None
1156
def call(self, *args):
1157
bytes = _encode_tuple(args)
1158
self._request.accept_bytes(bytes)
1159
self._request.finished_writing()
1161
def call_with_body_bytes(self, args, body):
1162
"""Make a remote call of args with body bytes 'body'.
1164
After calling this, call read_response_tuple to find the result out.
1166
bytes = _encode_tuple(args)
1167
self._request.accept_bytes(bytes)
1168
bytes = self._encode_bulk_data(body)
1169
self._request.accept_bytes(bytes)
1170
self._request.finished_writing()
1172
def call_with_body_readv_array(self, args, body):
1173
"""Make a remote call with a readv array.
1175
The body is encoded with one line per readv offset pair. The numbers in
1176
each pair are separated by a comma, and no trailing \n is emitted.
1178
bytes = _encode_tuple(args)
1179
self._request.accept_bytes(bytes)
1180
readv_bytes = self._serialise_offsets(body)
1181
bytes = self._encode_bulk_data(readv_bytes)
1182
self._request.accept_bytes(bytes)
1183
self._request.finished_writing()
1185
def cancel_read_body(self):
1186
"""After expecting a body, a response code may indicate one otherwise.
1188
This method lets the domain client inform the protocol that no body
1189
will be transmitted. This is a terminal method: after calling it the
1190
protocol is not able to be used further.
1192
self._request.finished_reading()
1194
def read_response_tuple(self, expect_body=False):
1195
"""Read a response tuple from the wire.
1197
This should only be called once.
1199
result = self._recv_tuple()
1201
self._request.finished_reading()
1204
def read_body_bytes(self, count=-1):
1205
"""Read bytes from the body, decoding into a byte stream.
1207
We read all bytes at once to ensure we've checked the trailer for
1208
errors, and then feed the buffer back as read_body_bytes is called.
1210
if self._body_buffer is not None:
1211
return self._body_buffer.read(count)
1212
_body_decoder = LengthPrefixedBodyDecoder()
1214
while not _body_decoder.finished_reading:
1215
bytes_wanted = _body_decoder.next_read_size()
1216
bytes = self._request.read_bytes(bytes_wanted)
1217
_body_decoder.accept_bytes(bytes)
1218
self._request.finished_reading()
1219
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1220
# XXX: TODO check the trailer result.
1221
return self._body_buffer.read(count)
1223
def _recv_tuple(self):
1224
"""Receive a tuple from the medium request."""
1226
while not line or line[-1] != '\n':
1227
# TODO: this is inefficient - but tuples are short.
1228
new_char = self._request.read_bytes(1)
1230
assert new_char != '', "end of file reading from server."
1231
return _decode_tuple(line)
1233
def query_version(self):
1234
"""Return protocol version number of the server."""
1236
resp = self.read_response_tuple()
1237
if resp == ('ok', '1'):
1240
raise errors.SmartProtocolError("bad response %r" % (resp,))
1243
921
class SmartTCPTransport(SmartTransport):
1244
922
"""Connection to smart server over plain tcp.