179
179
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
182
def _send_tuple(to_file, args):
183
# XXX: this will be inefficient. Just ask Robert.
184
to_file.write('\x01'.join((a.encode('utf-8') for a in args)) + '\n')
182
def _encode_tuple(args):
183
"""Encode the tuple args to a bytestream."""
184
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
188
187
class SmartProtocolBase(object):
191
190
def _send_bulk_data(self, body):
192
191
"""Send chunked body data"""
193
192
assert isinstance(body, str)
194
self._out.write('%d\n' % len(body))
195
self._out.write(body)
196
self._out.write('done\n')
193
bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
194
self._write_and_flush(bytes)
199
196
# TODO: this only actually accomodates a single block; possibly should support
200
197
# multiple chunks?
221
218
self._translate_error(resp)
220
def _serialise_offsets(self, offsets):
221
"""Serialise a readv offset list."""
223
for start, length in offsets:
224
txt.append('%d,%d' % (start, length))
225
return '\n'.join(txt)
227
def _write_and_flush(self, bytes):
228
"""Write bytes to self._out and flush it."""
229
# XXX: this will be inefficient. Just ask Robert.
230
self._out.write(bytes)
224
234
class SmartStreamServer(SmartProtocolBase):
225
235
"""Handles smart commands coming over a stream.
258
268
def _send_tuple(self, args):
259
269
"""Send response header"""
260
return _send_tuple(self._out, args)
270
return self._write_and_flush(_encode_tuple(args))
262
272
def _send_error_and_disconnect(self, exception):
263
273
self._send_tuple(('error', str(exception)))
265
274
## self._out.close()
266
275
## self._in.close()
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
321
# for delivering the data for a request. This could be done with as the
322
# StreamServer, though that would create conflation between request and response
323
# which may be undesirable.
312
326
class SmartServer(object):
313
327
"""Protocol logic for smart server.
316
330
creates responses.
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
319
340
# TODO: Better way of representing the body for commands that take it,
320
341
# and allow it to be streamed into the server.
371
393
self._recv_body(),
372
394
self._deserialise_optional_mode(mode))
396
def _deserialise_offsets(self, text):
397
# XXX: FIXME this should be on the protocol object.
399
for line in text.split('\n'):
402
start, length = line.split(',')
403
offsets.append((int(start), int(length)))
406
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
407
create_parent_dir = (create_parent == 'T')
408
self._backing_transport.put_bytes_non_atomic(relpath,
410
mode=self._deserialise_optional_mode(mode),
411
create_parent_dir=create_parent_dir,
412
dir_mode=self._deserialise_optional_mode(dir_mode))
414
def do_readv(self, relpath):
415
offsets = self._deserialise_offsets(self._recv_body())
416
backing_bytes = ''.join(bytes for offset, bytes in
417
self._backing_transport.readv(relpath, offsets))
418
return SmartServerResponse(('readv',), backing_bytes)
374
420
def do_rename(self, rel_from, rel_to):
375
421
self._backing_transport.rename(rel_from, rel_to)
407
453
return SmartServerResponse(('FileExists', e.path))
408
454
except errors.DirectoryNotEmpty, e:
409
455
return SmartServerResponse(('DirectoryNotEmpty', e.path))
456
except errors.ShortReadvError, e:
457
return SmartServerResponse(('ShortReadvError',
458
e.path, str(e.offset), str(e.length), str(e.actual)))
410
459
except UnicodeError, e:
411
460
# If it is a DecodeError, than most likely we are starting
412
461
# with a plain string
547
596
type: SmartTCPTransport, etc.
599
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
600
# responsibilities: Put those on SmartClient or similar. This is vital for
601
# the ability to support multiple versions of the smart protocol over time:
602
# SmartTransport is an adapter from the Transport object model to the
603
# SmartClient model, not an encoder.
550
605
def __init__(self, url, clone_from=None, client=None):
654
709
self._serialise_optional_mode(mode))
655
710
self._translate_error(resp)
712
def put_bytes(self, relpath, upload_contents, mode=None):
713
# FIXME: upload_file is probably not safe for non-ascii characters -
714
# should probably just pass all parameters as length-delimited
716
resp = self._client._call_with_upload(
718
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
720
self._translate_error(resp)
722
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
723
create_parent_dir=False,
725
"""See Transport.put_bytes_non_atomic."""
726
# FIXME: no encoding in the transport!
727
create_parent_str = 'F'
728
if create_parent_dir:
729
create_parent_str = 'T'
731
resp = self._client._call_with_upload(
733
(self._remote_path(relpath), self._serialise_optional_mode(mode),
734
create_parent_str, self._serialise_optional_mode(dir_mode)),
736
self._translate_error(resp)
657
738
def put_file(self, relpath, upload_file, mode=None):
658
739
# its not ideal to seek back, but currently put_non_atomic_file depends
659
740
# on transports not reading before failing - which is a faulty
665
746
upload_file.seek(pos)
668
def put_bytes(self, relpath, upload_contents, mode=None):
669
# FIXME: upload_file is probably not safe for non-ascii characters -
670
# should probably just pass all parameters as length-delimited
672
resp = self._client._call_with_upload(
674
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
676
self._translate_error(resp)
749
def put_file_non_atomic(self, relpath, f, mode=None,
750
create_parent_dir=False,
752
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
753
create_parent_dir=create_parent_dir,
678
756
def append_file(self, relpath, from_file, mode=None):
679
757
return self.append_bytes(relpath, from_file.read(), mode)
691
769
resp = self._client._call('delete', self._remote_path(relpath))
692
770
self._translate_error(resp)
772
def readv(self, relpath, offsets):
776
offsets = list(offsets)
778
sorted_offsets = sorted(offsets)
779
# turn the list of offsets into a stack
780
offset_stack = iter(offsets)
781
cur_offset_and_size = offset_stack.next()
782
coalesced = list(self._coalesce_offsets(sorted_offsets,
783
limit=self._max_readv_combine,
784
fudge_factor=self._bytes_to_read_before_seek))
787
resp = self._client._call_with_upload(
789
(self._remote_path(relpath),),
790
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
792
if resp[0] != 'readv':
793
# This should raise an exception
794
self._translate_error(resp)
797
data = self._client._recv_bulk()
798
# Cache the results, but only until they have been fulfilled
800
for c_offset in coalesced:
801
if len(data) < c_offset.length:
802
raise errors.ShortReadvError(relpath, c_offset.start,
803
c_offset.length, actual=len(data))
804
for suboffset, subsize in c_offset.ranges:
805
key = (c_offset.start+suboffset, subsize)
806
data_map[key] = data[suboffset:suboffset+subsize]
807
data = data[c_offset.length:]
809
# Now that we've read some data, see if we can yield anything back
810
while cur_offset_and_size in data_map:
811
this_data = data_map.pop(cur_offset_and_size)
812
yield cur_offset_and_size[0], this_data
813
cur_offset_and_size = offset_stack.next()
694
815
def rename(self, rel_from, rel_to):
695
816
self._call('rename',
696
817
self._remote_path(rel_from),
728
849
raise errors.FileExists(resp[1])
729
850
elif what == 'DirectoryNotEmpty':
730
851
raise errors.DirectoryNotEmpty(resp[1])
852
elif what == 'ShortReadvError':
853
raise errors.ShortReadvError(resp[1], int(resp[2]),
854
int(resp[3]), int(resp[4]))
731
855
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
732
856
encoding = str(resp[1]) # encoding must always be a string
815
939
def _send_tuple(self, args):
816
940
self._ensure_connection()
817
_send_tuple(self._out, args)
941
return self._write_and_flush(_encode_tuple(args))
819
943
def _send_bulk_data(self, body):
820
944
self._ensure_connection()