/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
from __future__ import absolute_import
22
 
 
23
21
import collections
 
22
from cStringIO import StringIO
24
23
import struct
25
24
import sys
26
 
try:
27
 
    import _thread
28
 
except ImportError:
29
 
    import thread as _thread
 
25
import thread
 
26
import threading
30
27
import time
31
28
 
32
 
import breezy
33
 
from ... import (
 
29
import bzrlib
 
30
from bzrlib import (
34
31
    debug,
35
32
    errors,
36
33
    osutils,
37
34
    )
38
 
from ...sixish import (
39
 
    BytesIO,
40
 
    reraise,
41
 
)
42
 
from . import message, request
43
 
from ...sixish import text_type
44
 
from ...trace import log_exception_quietly, mutter
45
 
from ...bencode import bdecode_as_tuple, bencode
 
35
from bzrlib.smart import message, request
 
36
from bzrlib.trace import log_exception_quietly, mutter
 
37
from bzrlib.bencode import bdecode_as_tuple, bencode
46
38
 
47
39
 
48
40
# Protocol version strings.  These are sent as prefixes of bzr requests and
49
41
# responses to identify the protocol version being used. (There are no version
50
42
# one strings because that version doesn't send any).
51
 
REQUEST_VERSION_TWO = b'bzr request 2\n'
52
 
RESPONSE_VERSION_TWO = b'bzr response 2\n'
 
43
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
44
RESPONSE_VERSION_TWO = 'bzr response 2\n'
53
45
 
54
 
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
 
46
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
55
47
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
56
48
 
57
49
 
61
53
 
62
54
 
63
55
def _decode_tuple(req_line):
64
 
    if req_line is None or req_line == b'':
 
56
    if req_line is None or req_line == '':
65
57
        return None
66
 
    if not req_line.endswith(b'\n'):
 
58
    if req_line[-1] != '\n':
67
59
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
68
 
    return tuple(req_line[:-1].split(b'\x01'))
 
60
    return tuple(req_line[:-1].split('\x01'))
69
61
 
70
62
 
71
63
def _encode_tuple(args):
72
64
    """Encode the tuple args to a bytestream."""
73
 
    for arg in args:
74
 
        if isinstance(arg, text_type):
75
 
            raise TypeError(args)
76
 
    return b'\x01'.join(args) + b'\n'
 
65
    joined = '\x01'.join(args) + '\n'
 
66
    if type(joined) is unicode:
 
67
        # XXX: We should fix things so this never happens!  -AJB, 20100304
 
68
        mutter('response args contain unicode, should be only bytes: %r',
 
69
               joined)
 
70
        joined = joined.encode('ascii')
 
71
    return joined
77
72
 
78
73
 
79
74
class Requester(object):
117
112
    # support multiple chunks?
118
113
    def _encode_bulk_data(self, body):
119
114
        """Encode body as a bulk data chunk."""
120
 
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
 
115
        return ''.join(('%d\n' % len(body), body, 'done\n'))
121
116
 
122
117
    def _serialise_offsets(self, offsets):
123
118
        """Serialise a readv offset list."""
124
119
        txt = []
125
120
        for start, length in offsets:
126
 
            txt.append(b'%d,%d' % (start, length))
127
 
        return b'\n'.join(txt)
 
121
            txt.append('%d,%d' % (start, length))
 
122
        return '\n'.join(txt)
128
123
 
129
124
 
130
125
class SmartServerRequestProtocolOne(SmartProtocolBase):
131
126
    """Server-side encoding and decoding logic for smart version 1."""
132
127
 
133
128
    def __init__(self, backing_transport, write_func, root_client_path='/',
134
 
                 jail_root=None):
 
129
            jail_root=None):
135
130
        self._backing_transport = backing_transport
136
131
        self._root_client_path = root_client_path
137
132
        self._jail_root = jail_root
138
 
        self.unused_data = b''
 
133
        self.unused_data = ''
139
134
        self._finished = False
140
 
        self.in_buffer = b''
 
135
        self.in_buffer = ''
141
136
        self._has_dispatched = False
142
137
        self.request = None
143
138
        self._body_decoder = None
144
139
        self._write_func = write_func
145
140
 
146
 
    def accept_bytes(self, data):
 
141
    def accept_bytes(self, bytes):
147
142
        """Take bytes, and advance the internal state machine appropriately.
148
143
 
149
 
        :param data: must be a byte string
 
144
        :param bytes: must be a byte string
150
145
        """
151
 
        if not isinstance(data, bytes):
152
 
            raise ValueError(data)
153
 
        self.in_buffer += data
 
146
        if not isinstance(bytes, str):
 
147
            raise ValueError(bytes)
 
148
        self.in_buffer += bytes
154
149
        if not self._has_dispatched:
155
 
            if b'\n' not in self.in_buffer:
 
150
            if '\n' not in self.in_buffer:
156
151
                # no command line yet
157
152
                return
158
153
            self._has_dispatched = True
159
154
            try:
160
 
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
161
 
                first_line += b'\n'
 
155
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
156
                first_line += '\n'
162
157
                req_args = _decode_tuple(first_line)
163
158
                self.request = request.SmartServerRequestHandler(
164
159
                    self._backing_transport, commands=request.request_handlers,
168
163
                if self.request.finished_reading:
169
164
                    # trivial request
170
165
                    self.unused_data = self.in_buffer
171
 
                    self.in_buffer = b''
 
166
                    self.in_buffer = ''
172
167
                    self._send_response(self.request.response)
173
168
            except KeyboardInterrupt:
174
169
                raise
175
 
            except errors.UnknownSmartMethod as err:
 
170
            except errors.UnknownSmartMethod, err:
176
171
                protocol_error = errors.SmartProtocolError(
177
 
                    "bad request '%s'" % (err.verb.decode('ascii'),))
 
172
                    "bad request %r" % (err.verb,))
178
173
                failure = request.FailedSmartServerResponse(
179
 
                    (b'error', str(protocol_error).encode('utf-8')))
 
174
                    ('error', str(protocol_error)))
180
175
                self._send_response(failure)
181
176
                return
182
 
            except Exception as exception:
 
177
            except Exception, exception:
183
178
                # everything else: pass to client, flush, and quit
184
179
                log_exception_quietly()
185
180
                self._send_response(request.FailedSmartServerResponse(
186
 
                    (b'error', str(exception).encode('utf-8'))))
 
181
                    ('error', str(exception))))
187
182
                return
188
183
 
189
184
        if self._has_dispatched:
191
186
                # nothing to do.XXX: this routine should be a single state
192
187
                # machine too.
193
188
                self.unused_data += self.in_buffer
194
 
                self.in_buffer = b''
 
189
                self.in_buffer = ''
195
190
                return
196
191
            if self._body_decoder is None:
197
192
                self._body_decoder = LengthPrefixedBodyDecoder()
206
201
            if self.request.response is not None:
207
202
                self._send_response(self.request.response)
208
203
                self.unused_data = self.in_buffer
209
 
                self.in_buffer = b''
 
204
                self.in_buffer = ''
210
205
            else:
211
206
                if self.request.finished_reading:
212
207
                    raise AssertionError(
223
218
        self._write_success_or_failure_prefix(response)
224
219
        self._write_func(_encode_tuple(args))
225
220
        if body is not None:
226
 
            if not isinstance(body, bytes):
 
221
            if not isinstance(body, str):
227
222
                raise ValueError(body)
228
 
            data = self._encode_bulk_data(body)
229
 
            self._write_func(data)
 
223
            bytes = self._encode_bulk_data(body)
 
224
            self._write_func(bytes)
230
225
 
231
226
    def _write_protocol_version(self):
232
227
        """Write any prefixes this protocol requires.
263
258
    def _write_success_or_failure_prefix(self, response):
264
259
        """Write the protocol specific success/failure prefix."""
265
260
        if response.is_successful():
266
 
            self._write_func(b'success\n')
 
261
            self._write_func('success\n')
267
262
        else:
268
 
            self._write_func(b'failed\n')
 
263
            self._write_func('failed\n')
269
264
 
270
265
    def _write_protocol_version(self):
271
266
        r"""Write any prefixes this protocol requires.
283
278
        self._write_success_or_failure_prefix(response)
284
279
        self._write_func(_encode_tuple(response.args))
285
280
        if response.body is not None:
286
 
            if not isinstance(response.body, bytes):
287
 
                raise AssertionError('body must be bytes')
 
281
            if not isinstance(response.body, str):
 
282
                raise AssertionError('body must be a str')
288
283
            if not (response.body_stream is None):
289
284
                raise AssertionError(
290
285
                    'body_stream and body cannot both be set')
291
 
            data = self._encode_bulk_data(response.body)
292
 
            self._write_func(data)
 
286
            bytes = self._encode_bulk_data(response.body)
 
287
            self._write_func(bytes)
293
288
        elif response.body_stream is not None:
294
289
            _send_stream(response.body_stream, self._write_func)
295
290
 
296
291
 
297
292
def _send_stream(stream, write_func):
298
 
    write_func(b'chunked\n')
 
293
    write_func('chunked\n')
299
294
    _send_chunks(stream, write_func)
300
 
    write_func(b'END\n')
 
295
    write_func('END\n')
301
296
 
302
297
 
303
298
def _send_chunks(stream, write_func):
304
299
    for chunk in stream:
305
 
        if isinstance(chunk, bytes):
306
 
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
307
 
            write_func(data)
 
300
        if isinstance(chunk, str):
 
301
            bytes = "%x\n%s" % (len(chunk), chunk)
 
302
            write_func(bytes)
308
303
        elif isinstance(chunk, request.FailedSmartServerResponse):
309
 
            write_func(b'ERR\n')
 
304
            write_func('ERR\n')
310
305
            _send_chunks(chunk.args, write_func)
311
306
            return
312
307
        else:
344
339
        self.finished_reading = False
345
340
        self._in_buffer_list = []
346
341
        self._in_buffer_len = 0
347
 
        self.unused_data = b''
 
342
        self.unused_data = ''
348
343
        self.bytes_left = None
349
344
        self._number_needed_bytes = None
350
345
 
351
346
    def _get_in_buffer(self):
352
347
        if len(self._in_buffer_list) == 1:
353
348
            return self._in_buffer_list[0]
354
 
        in_buffer = b''.join(self._in_buffer_list)
 
349
        in_buffer = ''.join(self._in_buffer_list)
355
350
        if len(in_buffer) != self._in_buffer_len:
356
351
            raise AssertionError(
357
352
                "Length of buffer did not match expected value: %s != %s"
370
365
        # check if we can yield the bytes from just the first entry in our list
371
366
        if len(self._in_buffer_list) == 0:
372
367
            raise AssertionError('Callers must be sure we have buffered bytes'
373
 
                                 ' before calling _get_in_bytes')
 
368
                ' before calling _get_in_bytes')
374
369
        if len(self._in_buffer_list[0]) > count:
375
370
            return self._in_buffer_list[0][:count]
376
371
        # We can't yield it from the first buffer, so collapse all buffers, and
380
375
 
381
376
    def _set_in_buffer(self, new_buf):
382
377
        if new_buf is not None:
383
 
            if not isinstance(new_buf, bytes):
384
 
                raise TypeError(new_buf)
385
378
            self._in_buffer_list = [new_buf]
386
379
            self._in_buffer_len = len(new_buf)
387
380
        else:
388
381
            self._in_buffer_list = []
389
382
            self._in_buffer_len = 0
390
383
 
391
 
    def accept_bytes(self, new_buf):
 
384
    def accept_bytes(self, bytes):
392
385
        """Decode as much of bytes as possible.
393
386
 
394
 
        If 'new_buf' contains too much data it will be appended to
 
387
        If 'bytes' contains too much data it will be appended to
395
388
        self.unused_data.
396
389
 
397
390
        finished_reading will be set when no more data is required.  Further
398
391
        data will be appended to self.unused_data.
399
392
        """
400
 
        if not isinstance(new_buf, bytes):
401
 
            raise TypeError(new_buf)
402
393
        # accept_bytes is allowed to change the state
403
394
        self._number_needed_bytes = None
404
395
        # lsprof puts a very large amount of time on this specific call for
405
396
        # large readv arrays
406
 
        self._in_buffer_list.append(new_buf)
407
 
        self._in_buffer_len += len(new_buf)
 
397
        self._in_buffer_list.append(bytes)
 
398
        self._in_buffer_len += len(bytes)
408
399
        try:
409
400
            # Run the function for the current state.
410
401
            current_state = self.state_accept
417
408
                #     _NeedMoreBytes).
418
409
                current_state = self.state_accept
419
410
                self.state_accept()
420
 
        except _NeedMoreBytes as e:
 
411
        except _NeedMoreBytes, e:
421
412
            self._number_needed_bytes = e.count
422
413
 
423
414
 
467
458
 
468
459
    def _extract_line(self):
469
460
        in_buf = self._get_in_buffer()
470
 
        pos = in_buf.find(b'\n')
 
461
        pos = in_buf.find('\n')
471
462
        if pos == -1:
472
463
            # We haven't read a complete line yet, so request more bytes before
473
464
            # we continue.
474
465
            raise _NeedMoreBytes(1)
475
466
        line = in_buf[:pos]
476
467
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
477
 
        self._set_in_buffer(in_buf[pos + 1:])
 
468
        self._set_in_buffer(in_buf[pos+1:])
478
469
        return line
479
470
 
480
471
    def _finished(self):
490
481
 
491
482
    def _state_accept_expecting_header(self):
492
483
        prefix = self._extract_line()
493
 
        if prefix == b'chunked':
 
484
        if prefix == 'chunked':
494
485
            self.state_accept = self._state_accept_expecting_length
495
486
        else:
496
487
            raise errors.SmartProtocolError(
498
489
 
499
490
    def _state_accept_expecting_length(self):
500
491
        prefix = self._extract_line()
501
 
        if prefix == b'ERR':
 
492
        if prefix == 'ERR':
502
493
            self.error = True
503
494
            self.error_in_progress = []
504
495
            self._state_accept_expecting_length()
505
496
            return
506
 
        elif prefix == b'END':
 
497
        elif prefix == 'END':
507
498
            # We've read the end-of-body marker.
508
499
            # Any further bytes are unused data, including the bytes left in
509
500
            # the _in_buffer.
511
502
            return
512
503
        else:
513
504
            self.bytes_left = int(prefix, 16)
514
 
            self.chunk_in_progress = b''
 
505
            self.chunk_in_progress = ''
515
506
            self.state_accept = self._state_accept_reading_chunk
516
507
 
517
508
    def _state_accept_reading_chunk(self):
542
533
        _StatefulDecoder.__init__(self)
543
534
        self.state_accept = self._state_accept_expecting_length
544
535
        self.state_read = self._state_read_no_data
545
 
        self._body = b''
546
 
        self._trailer_buffer = b''
 
536
        self._body = ''
 
537
        self._trailer_buffer = ''
547
538
 
548
539
    def next_read_size(self):
549
540
        if self.bytes_left is not None:
567
558
 
568
559
    def _state_accept_expecting_length(self):
569
560
        in_buf = self._get_in_buffer()
570
 
        pos = in_buf.find(b'\n')
 
561
        pos = in_buf.find('\n')
571
562
        if pos == -1:
572
563
            return
573
564
        self.bytes_left = int(in_buf[:pos])
574
 
        self._set_in_buffer(in_buf[pos + 1:])
 
565
        self._set_in_buffer(in_buf[pos+1:])
575
566
        self.state_accept = self._state_accept_reading_body
576
567
        self.state_read = self._state_read_body_buffer
577
568
 
593
584
        self._set_in_buffer(None)
594
585
        # TODO: what if the trailer does not match "done\n"?  Should this raise
595
586
        # a ProtocolViolation exception?
596
 
        if self._trailer_buffer.startswith(b'done\n'):
597
 
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
 
587
        if self._trailer_buffer.startswith('done\n'):
 
588
            self.unused_data = self._trailer_buffer[len('done\n'):]
598
589
            self.state_accept = self._state_accept_reading_unused
599
590
            self.finished_reading = True
600
591
 
603
594
        self._set_in_buffer(None)
604
595
 
605
596
    def _state_read_no_data(self):
606
 
        return b''
 
597
        return ''
607
598
 
608
599
    def _state_read_body_buffer(self):
609
600
        result = self._body
610
 
        self._body = b''
 
601
        self._body = ''
611
602
        return result
612
603
 
613
604
 
648
639
        if 'hpss' in debug.debug_flags:
649
640
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
650
641
            if getattr(self._request._medium, '_path', None) is not None:
651
 
                mutter('                  (to %s)',
652
 
                       self._request._medium._path)
 
642
                mutter('                  (to %s)', self._request._medium._path)
653
643
            mutter('              %d bytes', len(body))
654
644
            self._request_start_time = osutils.timer_func()
655
645
            if 'hpssdetail' in debug.debug_flags:
664
654
        """Make a remote call with a readv array.
665
655
 
666
656
        The body is encoded with one line per readv offset pair. The numbers in
667
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
657
        each pair are separated by a comma, and no trailing \n is emitted.
668
658
        """
669
659
        if 'hpss' in debug.debug_flags:
670
660
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
671
661
            if getattr(self._request._medium, '_path', None) is not None:
672
 
                mutter('                  (to %s)',
673
 
                       self._request._medium._path)
 
662
                mutter('                  (to %s)', self._request._medium._path)
674
663
            self._request_start_time = osutils.timer_func()
675
664
        self._write_args(args)
676
665
        readv_bytes = self._serialise_offsets(body)
729
718
        # returned in response to existing version 1 smart requests.  Responses
730
719
        # starting with these codes are always "failed" responses.
731
720
        v1_error_codes = [
732
 
            b'norepository',
733
 
            b'NoSuchFile',
734
 
            b'FileExists',
735
 
            b'DirectoryNotEmpty',
736
 
            b'ShortReadvError',
737
 
            b'UnicodeEncodeError',
738
 
            b'UnicodeDecodeError',
739
 
            b'ReadOnlyError',
740
 
            b'nobranch',
741
 
            b'NoSuchRevision',
742
 
            b'nosuchrevision',
743
 
            b'LockContention',
744
 
            b'UnlockableTransport',
745
 
            b'LockFailed',
746
 
            b'TokenMismatch',
747
 
            b'ReadError',
748
 
            b'PermissionDenied',
 
721
            'norepository',
 
722
            'NoSuchFile',
 
723
            'FileExists',
 
724
            'DirectoryNotEmpty',
 
725
            'ShortReadvError',
 
726
            'UnicodeEncodeError',
 
727
            'UnicodeDecodeError',
 
728
            'ReadOnlyError',
 
729
            'nobranch',
 
730
            'NoSuchRevision',
 
731
            'nosuchrevision',
 
732
            'LockContention',
 
733
            'UnlockableTransport',
 
734
            'LockFailed',
 
735
            'TokenMismatch',
 
736
            'ReadError',
 
737
            'PermissionDenied',
749
738
            ]
750
739
        if result_tuple[0] in v1_error_codes:
751
740
            self._request.finished_reading()
760
749
        :param verb: The verb used in that call.
761
750
        :raises: UnexpectedSmartServerResponse
762
751
        """
763
 
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
764
 
                             b"bad request '" + self._last_verb + b"'")
765
 
            or result_tuple == (b'error', b"Generic bzr smart protocol error: "
766
 
                                b"bad request u'%s'" % self._last_verb)):
 
752
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
 
753
                "bad request '%s'" % self._last_verb) or
 
754
              result_tuple == ('error', "Generic bzr smart protocol error: "
 
755
                "bad request u'%s'" % self._last_verb)):
767
756
            # The response will have no body, so we've finished reading.
768
757
            self._request.finished_reading()
769
758
            raise errors.UnknownSmartMethod(self._last_verb)
780
769
 
781
770
        while not _body_decoder.finished_reading:
782
771
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
783
 
            if bytes == b'':
 
772
            if bytes == '':
784
773
                # end of file encountered reading from server
785
774
                raise errors.ConnectionReset(
786
775
                    "Connection lost while reading response body.")
787
776
            _body_decoder.accept_bytes(bytes)
788
777
        self._request.finished_reading()
789
 
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
 
778
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
790
779
        # XXX: TODO check the trailer result.
791
780
        if 'hpss' in debug.debug_flags:
792
781
            mutter('              %d body bytes read',
799
788
 
800
789
    def query_version(self):
801
790
        """Return protocol version number of the server."""
802
 
        self.call(b'hello')
 
791
        self.call('hello')
803
792
        resp = self.read_response_tuple()
804
 
        if resp == (b'ok', b'1'):
 
793
        if resp == ('ok', '1'):
805
794
            return 1
806
 
        elif resp == (b'ok', b'2'):
 
795
        elif resp == ('ok', '2'):
807
796
            return 2
808
797
        else:
809
798
            raise errors.SmartProtocolError("bad response %r" % (resp,))
841
830
        response_status = self._request.read_line()
842
831
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
843
832
        self._response_is_unknown_method(result)
844
 
        if response_status == b'success\n':
 
833
        if response_status == 'success\n':
845
834
            self.response_status = True
846
835
            if not expect_body:
847
836
                self._request.finished_reading()
848
837
            return result
849
 
        elif response_status == b'failed\n':
 
838
        elif response_status == 'failed\n':
850
839
            self.response_status = False
851
840
            self._request.finished_reading()
852
841
            raise errors.ErrorFromSmartServer(result)
869
858
        _body_decoder = ChunkedBodyDecoder()
870
859
        while not _body_decoder.finished_reading:
871
860
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
872
 
            if bytes == b'':
 
861
            if bytes == '':
873
862
                # end of file encountered reading from server
874
863
                raise errors.ConnectionReset(
875
864
                    "Connection lost while reading streamed body.")
876
865
            _body_decoder.accept_bytes(bytes)
877
866
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
878
 
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
 
867
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
879
868
                    mutter('              %d byte chunk read',
880
869
                           len(body_bytes))
881
870
                yield body_bytes
888
877
        backing_transport, commands=request.request_handlers,
889
878
        root_client_path=root_client_path, jail_root=jail_root)
890
879
    responder = ProtocolThreeResponder(write_func)
891
 
    message_handler = message.ConventionalRequestHandler(
892
 
        request_handler, responder)
 
880
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
893
881
    return ProtocolThreeDecoder(message_handler)
894
882
 
895
883
 
919
907
            _StatefulDecoder.accept_bytes(self, bytes)
920
908
        except KeyboardInterrupt:
921
909
            raise
922
 
        except errors.SmartMessageHandlerError as exception:
 
910
        except errors.SmartMessageHandlerError, exception:
923
911
            # We do *not* set self.decoding_failed here.  The message handler
924
912
            # has raised an error, but the decoder is still able to parse bytes
925
913
            # and determine when this message ends.
929
917
            # The state machine is ready to continue decoding, but the
930
918
            # exception has interrupted the loop that runs the state machine.
931
919
            # So we call accept_bytes again to restart it.
932
 
            self.accept_bytes(b'')
933
 
        except Exception as exception:
 
920
            self.accept_bytes('')
 
921
        except Exception, exception:
934
922
            # The decoder itself has raised an exception.  We cannot continue
935
923
            # decoding.
936
924
            self.decoding_failed = True
977
965
            # The buffer is empty
978
966
            raise _NeedMoreBytes(1)
979
967
        in_buf = self._get_in_buffer()
980
 
        one_byte = in_buf[0:1]
 
968
        one_byte = in_buf[0]
981
969
        self._set_in_buffer(in_buf[1:])
982
970
        return one_byte
983
971
 
1004
992
 
1005
993
    def _state_accept_expecting_headers(self):
1006
994
        decoded = self._extract_prefixed_bencoded_data()
1007
 
        if not isinstance(decoded, dict):
 
995
        if type(decoded) is not dict:
1008
996
            raise errors.SmartProtocolError(
1009
997
                'Header object %r is not a dict' % (decoded,))
1010
998
        self.state_accept = self._state_accept_expecting_message_part
1015
1003
 
1016
1004
    def _state_accept_expecting_message_part(self):
1017
1005
        message_part_kind = self._extract_single_byte()
1018
 
        if message_part_kind == b'o':
 
1006
        if message_part_kind == 'o':
1019
1007
            self.state_accept = self._state_accept_expecting_one_byte
1020
 
        elif message_part_kind == b's':
 
1008
        elif message_part_kind == 's':
1021
1009
            self.state_accept = self._state_accept_expecting_structure
1022
 
        elif message_part_kind == b'b':
 
1010
        elif message_part_kind == 'b':
1023
1011
            self.state_accept = self._state_accept_expecting_bytes
1024
 
        elif message_part_kind == b'e':
 
1012
        elif message_part_kind == 'e':
1025
1013
            self.done()
1026
1014
        else:
1027
1015
            raise errors.SmartProtocolError(
1085
1073
class _ProtocolThreeEncoder(object):
1086
1074
 
1087
1075
    response_marker = request_marker = MESSAGE_VERSION_THREE
1088
 
    BUFFER_SIZE = 1024 * 1024  # 1 MiB buffer before flushing
 
1076
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1089
1077
 
1090
1078
    def __init__(self, write_func):
1091
1079
        self._buf = []
1093
1081
        self._real_write_func = write_func
1094
1082
 
1095
1083
    def _write_func(self, bytes):
 
1084
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1085
        #       for total number of bytes to write, rather than buffer based on
 
1086
        #       the number of write() calls
1096
1087
        # TODO: Another possibility would be to turn this into an async model.
1097
1088
        #       Where we let another thread know that we have some bytes if
1098
1089
        #       they want it, but we don't actually block for it
1105
1096
 
1106
1097
    def flush(self):
1107
1098
        if self._buf:
1108
 
            self._real_write_func(b''.join(self._buf))
 
1099
            self._real_write_func(''.join(self._buf))
1109
1100
            del self._buf[:]
1110
1101
            self._buf_len = 0
1111
1102
 
1113
1104
        """Serialise a readv offset list."""
1114
1105
        txt = []
1115
1106
        for start, length in offsets:
1116
 
            txt.append(b'%d,%d' % (start, length))
1117
 
        return b'\n'.join(txt)
 
1107
            txt.append('%d,%d' % (start, length))
 
1108
        return '\n'.join(txt)
1118
1109
 
1119
1110
    def _write_protocol_version(self):
1120
1111
        self._write_func(MESSAGE_VERSION_THREE)
1128
1119
        self._write_prefixed_bencode(headers)
1129
1120
 
1130
1121
    def _write_structure(self, args):
1131
 
        self._write_func(b's')
 
1122
        self._write_func('s')
1132
1123
        utf8_args = []
1133
1124
        for arg in args:
1134
 
            if isinstance(arg, text_type):
 
1125
            if type(arg) is unicode:
1135
1126
                utf8_args.append(arg.encode('utf8'))
1136
1127
            else:
1137
1128
                utf8_args.append(arg)
1138
1129
        self._write_prefixed_bencode(utf8_args)
1139
1130
 
1140
1131
    def _write_end(self):
1141
 
        self._write_func(b'e')
 
1132
        self._write_func('e')
1142
1133
        self.flush()
1143
1134
 
1144
1135
    def _write_prefixed_body(self, bytes):
1145
 
        self._write_func(b'b')
 
1136
        self._write_func('b')
1146
1137
        self._write_func(struct.pack('!L', len(bytes)))
1147
1138
        self._write_func(bytes)
1148
1139
 
1149
1140
    def _write_chunked_body_start(self):
1150
 
        self._write_func(b'oC')
 
1141
        self._write_func('oC')
1151
1142
 
1152
1143
    def _write_error_status(self):
1153
 
        self._write_func(b'oE')
 
1144
        self._write_func('oE')
1154
1145
 
1155
1146
    def _write_success_status(self):
1156
 
        self._write_func(b'oS')
 
1147
        self._write_func('oS')
1157
1148
 
1158
1149
 
1159
1150
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1161
1152
    def __init__(self, write_func):
1162
1153
        _ProtocolThreeEncoder.__init__(self, write_func)
1163
1154
        self.response_sent = False
1164
 
        self._headers = {
1165
 
            b'Software version': breezy.__version__.encode('utf-8')}
 
1155
        self._headers = {'Software version': bzrlib.__version__}
1166
1156
        if 'hpss' in debug.debug_flags:
1167
 
            self._thread_id = _thread.get_ident()
 
1157
            self._thread_id = thread.get_ident()
1168
1158
            self._response_start_time = None
1169
1159
 
1170
1160
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1190
1180
                % (exception,))
1191
1181
        if isinstance(exception, errors.UnknownSmartMethod):
1192
1182
            failure = request.FailedSmartServerResponse(
1193
 
                (b'UnknownMethod', exception.verb))
 
1183
                ('UnknownMethod', exception.verb))
1194
1184
            self.send_response(failure)
1195
1185
            return
1196
1186
        if 'hpss' in debug.debug_flags:
1199
1189
        self._write_protocol_version()
1200
1190
        self._write_headers(self._headers)
1201
1191
        self._write_error_status()
1202
 
        self._write_structure(
1203
 
            (b'error', str(exception).encode('utf-8', 'replace')))
 
1192
        self._write_structure(('error', str(exception)))
1204
1193
        self._write_end()
1205
1194
 
1206
1195
    def send_response(self, response):
1242
1231
                    if first_chunk is None:
1243
1232
                        first_chunk = chunk
1244
1233
                    self._write_prefixed_body(chunk)
1245
 
                    self.flush()
1246
1234
                    if 'hpssdetail' in debug.debug_flags:
1247
1235
                        # Not worth timing separately, as _write_func is
1248
1236
                        # actually buffered
1285
1273
    iterator = iter(iterable)
1286
1274
    while True:
1287
1275
        try:
1288
 
            yield None, next(iterator)
 
1276
            yield None, iterator.next()
1289
1277
        except StopIteration:
1290
1278
            return
1291
1279
        except (KeyboardInterrupt, SystemExit):
1303
1291
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1304
1292
        self._medium_request = medium_request
1305
1293
        self._headers = {}
1306
 
        self.body_stream_started = None
1307
1294
 
1308
1295
    def set_headers(self, headers):
1309
1296
        self._headers = headers.copy()
1344
1331
        """Make a remote call with a readv array.
1345
1332
 
1346
1333
        The body is encoded with one line per readv offset pair. The numbers in
1347
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
1334
        each pair are separated by a comma, and no trailing \n is emitted.
1348
1335
        """
1349
1336
        if 'hpss' in debug.debug_flags:
1350
1337
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1369
1356
            if path is not None:
1370
1357
                mutter('                  (to %s)', path)
1371
1358
            self._request_start_time = osutils.timer_func()
1372
 
        self.body_stream_started = False
1373
1359
        self._write_protocol_version()
1374
1360
        self._write_headers(self._headers)
1375
1361
        self._write_structure(args)
1377
1363
        #       have finished sending the stream.  We would notice at the end
1378
1364
        #       anyway, but if the medium can deliver it early then it's good
1379
1365
        #       to short-circuit the whole request...
1380
 
        # Provoke any ConnectionReset failures before we start the body stream.
1381
 
        self.flush()
1382
 
        self.body_stream_started = True
1383
1366
        for exc_info, part in _iter_with_errors(stream):
1384
1367
            if exc_info is not None:
1385
1368
                # Iterating the stream failed.  Cleanly abort the request.
1386
1369
                self._write_error_status()
1387
1370
                # Currently the client unconditionally sends ('error',) as the
1388
1371
                # error args.
1389
 
                self._write_structure((b'error',))
 
1372
                self._write_structure(('error',))
1390
1373
                self._write_end()
1391
1374
                self._medium_request.finished_writing()
1392
 
                try:
1393
 
                    reraise(*exc_info)
1394
 
                finally:
1395
 
                    del exc_info
 
1375
                raise exc_info[0], exc_info[1], exc_info[2]
1396
1376
            else:
1397
1377
                self._write_prefixed_body(part)
1398
1378
                self.flush()
1399
1379
        self._write_end()
1400
1380
        self._medium_request.finished_writing()
 
1381