/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
179
179
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
180
180
 
181
181
 
182
 
def _send_tuple(to_file, args):
183
 
    # XXX: this will be inefficient.  Just ask Robert.
184
 
    to_file.write('\x01'.join((a.encode('utf-8') for a in args)) + '\n')
185
 
    to_file.flush()
 
182
def _encode_tuple(args):
 
183
    """Encode the tuple args to a bytestream."""
 
184
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
186
185
 
187
186
 
188
187
class SmartProtocolBase(object):
191
190
    def _send_bulk_data(self, body):
192
191
        """Send chunked body data"""
193
192
        assert isinstance(body, str)
194
 
        self._out.write('%d\n' % len(body))
195
 
        self._out.write(body)
196
 
        self._out.write('done\n')
197
 
        self._out.flush()
 
193
        bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
 
194
        self._write_and_flush(bytes)
198
195
 
199
196
    # TODO: this only actually accomodates a single block; possibly should support
200
197
    # multiple chunks?
220
217
        else:
221
218
            self._translate_error(resp)
222
219
 
 
220
    def _serialise_offsets(self, offsets):
 
221
        """Serialise a readv offset list."""
 
222
        txt = []
 
223
        for start, length in offsets:
 
224
            txt.append('%d,%d' % (start, length))
 
225
        return '\n'.join(txt)
 
226
 
 
227
    def _write_and_flush(self, bytes):
 
228
        """Write bytes to self._out and flush it."""
 
229
        # XXX: this will be inefficient.  Just ask Robert.
 
230
        self._out.write(bytes)
 
231
        self._out.flush()
 
232
 
223
233
 
224
234
class SmartStreamServer(SmartProtocolBase):
225
235
    """Handles smart commands coming over a stream.
257
267
 
258
268
    def _send_tuple(self, args):
259
269
        """Send response header"""
260
 
        return _send_tuple(self._out, args)
 
270
        return self._write_and_flush(_encode_tuple(args))
261
271
 
262
272
    def _send_error_and_disconnect(self, exception):
263
273
        self._send_tuple(('error', str(exception)))
264
 
        self._out.flush()
265
274
        ## self._out.close()
266
275
        ## self._in.close()
267
276
 
308
317
        self.args = args
309
318
        self.body = body
310
319
 
 
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
 
321
# for delivering the data for a request. This could be done with as the
 
322
# StreamServer, though that would create conflation between request and response
 
323
# which may be undesirable.
 
324
 
311
325
 
312
326
class SmartServer(object):
313
327
    """Protocol logic for smart server.
316
330
    creates responses.
317
331
    """
318
332
 
 
333
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
 
334
    # encoding or decoding logic to allow the wire protocol to vary from the
 
335
    # object protocol: we will want to tweak the wire protocol separate from
 
336
    # the object model, and ideally we will be able to do that without having
 
337
    # a SmartServer subclass for each wire protocol, rather just a Protocol
 
338
    # subclass.
 
339
 
319
340
    # TODO: Better way of representing the body for commands that take it,
320
341
    # and allow it to be streamed into the server.
321
342
    
335
356
        return SmartServerResponse(('ok',), backing_bytes)
336
357
 
337
358
    def _deserialise_optional_mode(self, mode):
 
359
        # XXX: FIXME this should be on the protocol object.
338
360
        if mode == '':
339
361
            return None
340
362
        else:
371
393
                self._recv_body(),
372
394
                self._deserialise_optional_mode(mode))
373
395
 
 
396
    def _deserialise_offsets(self, text):
 
397
        # XXX: FIXME this should be on the protocol object.
 
398
        offsets = []
 
399
        for line in text.split('\n'):
 
400
            if not line:
 
401
                continue
 
402
            start, length = line.split(',')
 
403
            offsets.append((int(start), int(length)))
 
404
        return offsets
 
405
 
 
406
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
407
        create_parent_dir = (create_parent == 'T')
 
408
        self._backing_transport.put_bytes_non_atomic(relpath,
 
409
                self._recv_body(),
 
410
                mode=self._deserialise_optional_mode(mode),
 
411
                create_parent_dir=create_parent_dir,
 
412
                dir_mode=self._deserialise_optional_mode(dir_mode))
 
413
 
 
414
    def do_readv(self, relpath):
 
415
        offsets = self._deserialise_offsets(self._recv_body())
 
416
        backing_bytes = ''.join(bytes for offset, bytes in
 
417
                             self._backing_transport.readv(relpath, offsets))
 
418
        return SmartServerResponse(('readv',), backing_bytes)
 
419
        
374
420
    def do_rename(self, rel_from, rel_to):
375
421
        self._backing_transport.rename(rel_from, rel_to)
376
422
 
407
453
            return SmartServerResponse(('FileExists', e.path))
408
454
        except errors.DirectoryNotEmpty, e:
409
455
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
456
        except errors.ShortReadvError, e:
 
457
            return SmartServerResponse(('ShortReadvError',
 
458
                e.path, str(e.offset), str(e.length), str(e.actual)))
410
459
        except UnicodeError, e:
411
460
            # If it is a DecodeError, than most likely we are starting
412
461
            # with a plain string
547
596
    type: SmartTCPTransport, etc.
548
597
    """
549
598
 
 
599
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
600
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
601
    # the ability to support multiple versions of the smart protocol over time:
 
602
    # SmartTransport is an adapter from the Transport object model to the 
 
603
    # SmartClient model, not an encoder.
 
604
 
550
605
    def __init__(self, url, clone_from=None, client=None):
551
606
        """Constructor.
552
607
 
654
709
                                  self._serialise_optional_mode(mode))
655
710
        self._translate_error(resp)
656
711
 
 
712
    def put_bytes(self, relpath, upload_contents, mode=None):
 
713
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
714
        # should probably just pass all parameters as length-delimited
 
715
        # strings?
 
716
        resp = self._client._call_with_upload(
 
717
            'put',
 
718
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
719
            upload_contents)
 
720
        self._translate_error(resp)
 
721
 
 
722
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
723
                             create_parent_dir=False,
 
724
                             dir_mode=None):
 
725
        """See Transport.put_bytes_non_atomic."""
 
726
        # FIXME: no encoding in the transport!
 
727
        create_parent_str = 'F'
 
728
        if create_parent_dir:
 
729
            create_parent_str = 'T'
 
730
 
 
731
        resp = self._client._call_with_upload(
 
732
            'put_non_atomic',
 
733
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
734
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
735
            bytes)
 
736
        self._translate_error(resp)
 
737
 
657
738
    def put_file(self, relpath, upload_file, mode=None):
658
739
        # its not ideal to seek back, but currently put_non_atomic_file depends
659
740
        # on transports not reading before failing - which is a faulty
665
746
            upload_file.seek(pos)
666
747
            raise
667
748
 
668
 
    def put_bytes(self, relpath, upload_contents, mode=None):
669
 
        # FIXME: upload_file is probably not safe for non-ascii characters -
670
 
        # should probably just pass all parameters as length-delimited
671
 
        # strings?
672
 
        resp = self._client._call_with_upload(
673
 
            'put',
674
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
675
 
            upload_contents)
676
 
        self._translate_error(resp)
 
749
    def put_file_non_atomic(self, relpath, f, mode=None,
 
750
                            create_parent_dir=False,
 
751
                            dir_mode=None):
 
752
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
753
                                         create_parent_dir=create_parent_dir,
 
754
                                         dir_mode=dir_mode)
677
755
 
678
756
    def append_file(self, relpath, from_file, mode=None):
679
757
        return self.append_bytes(relpath, from_file.read(), mode)
691
769
        resp = self._client._call('delete', self._remote_path(relpath))
692
770
        self._translate_error(resp)
693
771
 
 
772
    def readv(self, relpath, offsets):
 
773
        if not offsets:
 
774
            return
 
775
 
 
776
        offsets = list(offsets)
 
777
 
 
778
        sorted_offsets = sorted(offsets)
 
779
        # turn the list of offsets into a stack
 
780
        offset_stack = iter(offsets)
 
781
        cur_offset_and_size = offset_stack.next()
 
782
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
783
                               limit=self._max_readv_combine,
 
784
                               fudge_factor=self._bytes_to_read_before_seek))
 
785
 
 
786
 
 
787
        resp = self._client._call_with_upload(
 
788
            'readv',
 
789
            (self._remote_path(relpath),),
 
790
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
791
 
 
792
        if resp[0] != 'readv':
 
793
            # This should raise an exception
 
794
            self._translate_error(resp)
 
795
            return
 
796
 
 
797
        data = self._client._recv_bulk()
 
798
        # Cache the results, but only until they have been fulfilled
 
799
        data_map = {}
 
800
        for c_offset in coalesced:
 
801
            if len(data) < c_offset.length:
 
802
                raise errors.ShortReadvError(relpath, c_offset.start,
 
803
                            c_offset.length, actual=len(data))
 
804
            for suboffset, subsize in c_offset.ranges:
 
805
                key = (c_offset.start+suboffset, subsize)
 
806
                data_map[key] = data[suboffset:suboffset+subsize]
 
807
            data = data[c_offset.length:]
 
808
 
 
809
            # Now that we've read some data, see if we can yield anything back
 
810
            while cur_offset_and_size in data_map:
 
811
                this_data = data_map.pop(cur_offset_and_size)
 
812
                yield cur_offset_and_size[0], this_data
 
813
                cur_offset_and_size = offset_stack.next()
 
814
 
694
815
    def rename(self, rel_from, rel_to):
695
816
        self._call('rename', 
696
817
                   self._remote_path(rel_from),
728
849
            raise errors.FileExists(resp[1])
729
850
        elif what == 'DirectoryNotEmpty':
730
851
            raise errors.DirectoryNotEmpty(resp[1])
 
852
        elif what == 'ShortReadvError':
 
853
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
854
                                         int(resp[3]), int(resp[4]))
731
855
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
732
856
            encoding = str(resp[1]) # encoding must always be a string
733
857
            val = resp[2]
814
938
 
815
939
    def _send_tuple(self, args):
816
940
        self._ensure_connection()
817
 
        _send_tuple(self._out, args)
 
941
        return self._write_and_flush(_encode_tuple(args))
818
942
 
819
943
    def _send_bulk_data(self, body):
820
944
        self._ensure_connection()