/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Jelmer Vernooij
  • Date: 2009-02-25 15:36:48 UTC
  • mfrom: (4048 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4050.
  • Revision ID: jelmer@samba.org-20090225153648-7r5mk20nr9dttqbf
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
from bzrlib import errors
30
30
from bzrlib.smart import message, request
31
31
from bzrlib.trace import log_exception_quietly, mutter
32
 
from bzrlib.util.bencode import bdecode, bencode
 
32
from bzrlib.util.bencode import bdecode_as_tuple, bencode
33
33
 
34
34
 
35
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
109
109
        for start, length in offsets:
110
110
            txt.append('%d,%d' % (start, length))
111
111
        return '\n'.join(txt)
112
 
        
 
112
 
113
113
 
114
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
115
    """Server-side encoding and decoding logic for smart version 1."""
116
 
    
 
116
 
117
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
118
118
        self._backing_transport = backing_transport
119
119
        self._root_client_path = root_client_path
127
127
 
128
128
    def accept_bytes(self, bytes):
129
129
        """Take bytes, and advance the internal state machine appropriately.
130
 
        
 
130
 
131
131
        :param bytes: must be a byte string
132
132
        """
133
133
        if not isinstance(bytes, str):
169
169
 
170
170
        if self._has_dispatched:
171
171
            if self._finished:
172
 
                # nothing to do.XXX: this routine should be a single state 
 
172
                # nothing to do.XXX: this routine should be a single state
173
173
                # machine too.
174
174
                self.unused_data += self.in_buffer
175
175
                self.in_buffer = ''
211
211
 
212
212
    def _write_protocol_version(self):
213
213
        """Write any prefixes this protocol requires.
214
 
        
 
214
 
215
215
        Version one doesn't send protocol versions.
216
216
        """
217
217
 
234
234
 
235
235
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
236
236
    r"""Version two of the server side of the smart protocol.
237
 
   
 
237
 
238
238
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
239
239
    """
240
240
 
250
250
 
251
251
    def _write_protocol_version(self):
252
252
        r"""Write any prefixes this protocol requires.
253
 
        
 
253
 
254
254
        Version two sends the value of RESPONSE_VERSION_TWO.
255
255
        """
256
256
        self._write_func(self.response_marker)
412
412
        self.chunks = collections.deque()
413
413
        self.error = False
414
414
        self.error_in_progress = None
415
 
    
 
415
 
416
416
    def next_read_size(self):
417
417
        # Note: the shortest possible chunk is 2 bytes: '0\n', and the
418
418
        # end-of-body marker is 4 bytes: 'END\n'.
506
506
                self.chunks.append(self.chunk_in_progress)
507
507
            self.chunk_in_progress = None
508
508
            self.state_accept = self._state_accept_expecting_length
509
 
        
 
509
 
510
510
    def _state_accept_reading_unused(self):
511
511
        self.unused_data += self._get_in_buffer()
512
512
        self._in_buffer_list = []
514
514
 
515
515
class LengthPrefixedBodyDecoder(_StatefulDecoder):
516
516
    """Decodes the length-prefixed bulk data."""
517
 
    
 
517
 
518
518
    def __init__(self):
519
519
        _StatefulDecoder.__init__(self)
520
520
        self.state_accept = self._state_accept_expecting_length
521
521
        self.state_read = self._state_read_no_data
522
522
        self._body = ''
523
523
        self._trailer_buffer = ''
524
 
    
 
524
 
525
525
    def next_read_size(self):
526
526
        if self.bytes_left is not None:
527
527
            # Ideally we want to read all the remainder of the body and the
537
537
        else:
538
538
            # Reading excess data.  Either way, 1 byte at a time is fine.
539
539
            return 1
540
 
        
 
540
 
541
541
    def read_pending_data(self):
542
542
        """Return any pending data that has been decoded."""
543
543
        return self.state_read()
564
564
                self._body = self._body[:self.bytes_left]
565
565
            self.bytes_left = None
566
566
            self.state_accept = self._state_accept_reading_trailer
567
 
        
 
567
 
568
568
    def _state_accept_reading_trailer(self):
569
569
        self._trailer_buffer += self._get_in_buffer()
570
570
        self._set_in_buffer(None)
574
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
575
575
            self.state_accept = self._state_accept_reading_unused
576
576
            self.finished_reading = True
577
 
    
 
577
 
578
578
    def _state_accept_reading_unused(self):
579
579
        self.unused_data += self._get_in_buffer()
580
580
        self._set_in_buffer(None)
656
656
            mutter('              %d bytes in readv request', len(readv_bytes))
657
657
        self._last_verb = args[0]
658
658
 
 
659
    def call_with_body_stream(self, args, stream):
 
660
        # Protocols v1 and v2 don't support body streams.  So it's safe to
 
661
        # assume that a v1/v2 server doesn't support whatever method we're
 
662
        # trying to call with a body stream.
 
663
        self._request.finished_writing()
 
664
        self._request.finished_reading()
 
665
        raise errors.UnknownSmartMethod(args[0])
 
666
 
659
667
    def cancel_read_body(self):
660
668
        """After expecting a body, a response code may indicate one otherwise.
661
669
 
721
729
    def _response_is_unknown_method(self, result_tuple):
722
730
        """Raise UnexpectedSmartServerResponse if the response is an 'unknonwn
723
731
        method' response to the request.
724
 
        
 
732
 
725
733
        :param response: The response from a smart client call_expecting_body
726
734
            call.
727
735
        :param verb: The verb used in that call.
734
742
            # The response will have no body, so we've finished reading.
735
743
            self._request.finished_reading()
736
744
            raise errors.UnknownSmartMethod(self._last_verb)
737
 
        
 
745
 
738
746
    def read_body_bytes(self, count=-1):
739
747
        """Read bytes from the body, decoding into a byte stream.
740
 
        
741
 
        We read all bytes at once to ensure we've checked the trailer for 
 
748
 
 
749
        We read all bytes at once to ensure we've checked the trailer for
742
750
        errors, and then feed the buffer back as read_body_bytes is called.
743
751
        """
744
752
        if self._body_buffer is not None:
782
790
 
783
791
    def _write_protocol_version(self):
784
792
        """Write any prefixes this protocol requires.
785
 
        
 
793
 
786
794
        Version one doesn't send protocol versions.
787
795
        """
788
796
 
789
797
 
790
798
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
791
799
    """Version two of the client side of the smart protocol.
792
 
    
 
800
 
793
801
    This prefixes the request with the value of REQUEST_VERSION_TWO.
794
802
    """
795
803
 
823
831
 
824
832
    def _write_protocol_version(self):
825
833
        """Write any prefixes this protocol requires.
826
 
        
 
834
 
827
835
        Version two sends the value of REQUEST_VERSION_TWO.
828
836
        """
829
837
        self._request.accept_bytes(self.request_marker)
931
939
    def _extract_prefixed_bencoded_data(self):
932
940
        prefixed_bytes = self._extract_length_prefixed_bytes()
933
941
        try:
934
 
            decoded = bdecode(prefixed_bytes)
 
942
            decoded = bdecode_as_tuple(prefixed_bytes)
935
943
        except ValueError:
936
944
            raise errors.SmartProtocolError(
937
945
                'Bytes %r not bencoded' % (prefixed_bytes,))
977
985
            self.message_handler.headers_received(decoded)
978
986
        except:
979
987
            raise errors.SmartMessageHandlerError(sys.exc_info())
980
 
    
 
988
 
981
989
    def _state_accept_expecting_message_part(self):
982
990
        message_part_kind = self._extract_single_byte()
983
991
        if message_part_kind == 'o':
1069
1077
        for start, length in offsets:
1070
1078
            txt.append('%d,%d' % (start, length))
1071
1079
        return '\n'.join(txt)
1072
 
        
 
1080
 
1073
1081
    def _write_protocol_version(self):
1074
1082
        self._write_func(MESSAGE_VERSION_THREE)
1075
1083
 
1100
1108
        self._write_func(struct.pack('!L', len(bytes)))
1101
1109
        self._write_func(bytes)
1102
1110
 
 
1111
    def _write_chunked_body_start(self):
 
1112
        self._write_func('oC')
 
1113
 
1103
1114
    def _write_error_status(self):
1104
1115
        self._write_func('oE')
1105
1116
 
1151
1162
                self._write_prefixed_body(chunk)
1152
1163
                self.flush()
1153
1164
        self._write_end()
1154
 
        
 
1165
 
1155
1166
 
1156
1167
class ProtocolThreeRequester(_ProtocolThreeEncoder, Requester):
1157
1168
 
1162
1173
 
1163
1174
    def set_headers(self, headers):
1164
1175
        self._headers = headers.copy()
1165
 
        
 
1176
 
1166
1177
    def call(self, *args):
1167
1178
        if 'hpss' in debug.debug_flags:
1168
1179
            mutter('hpss call:   %s', repr(args)[1:-1])
1217
1228
        self._write_end()
1218
1229
        self._medium_request.finished_writing()
1219
1230
 
 
1231
    def call_with_body_stream(self, args, stream):
 
1232
        if 'hpss' in debug.debug_flags:
 
1233
            mutter('hpss call w/body stream: %r', args)
 
1234
            path = getattr(self._medium_request._medium, '_path', None)
 
1235
            if path is not None:
 
1236
                mutter('                  (to %s)', path)
 
1237
            self._request_start_time = time.time()
 
1238
        self._write_protocol_version()
 
1239
        self._write_headers(self._headers)
 
1240
        self._write_structure(args)
 
1241
        # TODO: notice if the server has sent an early error reply before we
 
1242
        #       have finished sending the stream.  We would notice at the end
 
1243
        #       anyway, but if the medium can deliver it early then it's good
 
1244
        #       to short-circuit the whole request...
 
1245
        try:
 
1246
            for part in stream:
 
1247
                self._write_prefixed_body(part)
 
1248
                self.flush()
 
1249
        except Exception:
 
1250
            exc_info = sys.exc_info()
 
1251
            # Iterating the stream failed.  Cleanly abort the request.
 
1252
            self._write_error_status()
 
1253
            # Currently the client unconditionally sends ('error',) as the
 
1254
            # error args.
 
1255
            self._write_structure(('error',))
 
1256
            self._write_end()
 
1257
            self._medium_request.finished_writing()
 
1258
            raise exc_info[0], exc_info[1], exc_info[2]
 
1259
        self._write_end()
 
1260
        self._medium_request.finished_writing()
 
1261