/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/protocol.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-05 01:40:59 UTC
  • mto: This revision was merged to the branch mainline in revision 7480.
  • Revision ID: jelmer@jelmer.uk-20200205014059-1jrhjaphw5vh9i7s
Fix Python 2.7 build.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
import collections
22
 
from cStringIO import StringIO
 
21
from __future__ import absolute_import
 
22
 
 
23
try:
 
24
    from collections.abc import deque
 
25
except ImportError:  # python < 3.7
 
26
    from collections import deque
 
27
 
23
28
import struct
24
29
import sys
25
 
import thread
26
 
import threading
 
30
try:
 
31
    import _thread
 
32
except ImportError:
 
33
    import thread as _thread
27
34
import time
28
35
 
29
 
import bzrlib
30
 
from bzrlib import (
 
36
import breezy
 
37
from ... import (
31
38
    debug,
32
39
    errors,
33
40
    osutils,
34
41
    )
35
 
from bzrlib.smart import message, request
36
 
from bzrlib.trace import log_exception_quietly, mutter
37
 
from bzrlib.bencode import bdecode_as_tuple, bencode
 
42
from ...sixish import (
 
43
    BytesIO,
 
44
    reraise,
 
45
)
 
46
from . import message, request
 
47
from ...sixish import text_type
 
48
from ...trace import log_exception_quietly, mutter
 
49
from ...bencode import bdecode_as_tuple, bencode
38
50
 
39
51
 
40
52
# Protocol version strings.  These are sent as prefixes of bzr requests and
41
53
# responses to identify the protocol version being used. (There are no version
42
54
# one strings because that version doesn't send any).
43
 
REQUEST_VERSION_TWO = 'bzr request 2\n'
44
 
RESPONSE_VERSION_TWO = 'bzr response 2\n'
 
55
REQUEST_VERSION_TWO = b'bzr request 2\n'
 
56
RESPONSE_VERSION_TWO = b'bzr response 2\n'
45
57
 
46
 
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
 
58
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
47
59
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
48
60
 
49
61
 
53
65
 
54
66
 
55
67
def _decode_tuple(req_line):
56
 
    if req_line is None or req_line == '':
 
68
    if req_line is None or req_line == b'':
57
69
        return None
58
 
    if req_line[-1] != '\n':
 
70
    if not req_line.endswith(b'\n'):
59
71
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
60
 
    return tuple(req_line[:-1].split('\x01'))
 
72
    return tuple(req_line[:-1].split(b'\x01'))
61
73
 
62
74
 
63
75
def _encode_tuple(args):
64
76
    """Encode the tuple args to a bytestream."""
65
 
    joined = '\x01'.join(args) + '\n'
66
 
    if type(joined) is unicode:
67
 
        # XXX: We should fix things so this never happens!  -AJB, 20100304
68
 
        mutter('response args contain unicode, should be only bytes: %r',
69
 
               joined)
70
 
        joined = joined.encode('ascii')
71
 
    return joined
 
77
    for arg in args:
 
78
        if isinstance(arg, text_type):
 
79
            raise TypeError(args)
 
80
    return b'\x01'.join(args) + b'\n'
72
81
 
73
82
 
74
83
class Requester(object):
112
121
    # support multiple chunks?
113
122
    def _encode_bulk_data(self, body):
114
123
        """Encode body as a bulk data chunk."""
115
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
124
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
116
125
 
117
126
    def _serialise_offsets(self, offsets):
118
127
        """Serialise a readv offset list."""
119
128
        txt = []
120
129
        for start, length in offsets:
121
 
            txt.append('%d,%d' % (start, length))
122
 
        return '\n'.join(txt)
 
130
            txt.append(b'%d,%d' % (start, length))
 
131
        return b'\n'.join(txt)
123
132
 
124
133
 
125
134
class SmartServerRequestProtocolOne(SmartProtocolBase):
126
135
    """Server-side encoding and decoding logic for smart version 1."""
127
136
 
128
137
    def __init__(self, backing_transport, write_func, root_client_path='/',
129
 
            jail_root=None):
 
138
                 jail_root=None):
130
139
        self._backing_transport = backing_transport
131
140
        self._root_client_path = root_client_path
132
141
        self._jail_root = jail_root
133
 
        self.unused_data = ''
 
142
        self.unused_data = b''
134
143
        self._finished = False
135
 
        self.in_buffer = ''
 
144
        self.in_buffer = b''
136
145
        self._has_dispatched = False
137
146
        self.request = None
138
147
        self._body_decoder = None
139
148
        self._write_func = write_func
140
149
 
141
 
    def accept_bytes(self, bytes):
 
150
    def accept_bytes(self, data):
142
151
        """Take bytes, and advance the internal state machine appropriately.
143
152
 
144
 
        :param bytes: must be a byte string
 
153
        :param data: must be a byte string
145
154
        """
146
 
        if not isinstance(bytes, str):
147
 
            raise ValueError(bytes)
148
 
        self.in_buffer += bytes
 
155
        if not isinstance(data, bytes):
 
156
            raise ValueError(data)
 
157
        self.in_buffer += data
149
158
        if not self._has_dispatched:
150
 
            if '\n' not in self.in_buffer:
 
159
            if b'\n' not in self.in_buffer:
151
160
                # no command line yet
152
161
                return
153
162
            self._has_dispatched = True
154
163
            try:
155
 
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
156
 
                first_line += '\n'
 
164
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
 
165
                first_line += b'\n'
157
166
                req_args = _decode_tuple(first_line)
158
167
                self.request = request.SmartServerRequestHandler(
159
168
                    self._backing_transport, commands=request.request_handlers,
163
172
                if self.request.finished_reading:
164
173
                    # trivial request
165
174
                    self.unused_data = self.in_buffer
166
 
                    self.in_buffer = ''
 
175
                    self.in_buffer = b''
167
176
                    self._send_response(self.request.response)
168
177
            except KeyboardInterrupt:
169
178
                raise
170
 
            except errors.UnknownSmartMethod, err:
 
179
            except errors.UnknownSmartMethod as err:
171
180
                protocol_error = errors.SmartProtocolError(
172
 
                    "bad request %r" % (err.verb,))
 
181
                    "bad request '%s'" % (err.verb.decode('ascii'),))
173
182
                failure = request.FailedSmartServerResponse(
174
 
                    ('error', str(protocol_error)))
 
183
                    (b'error', str(protocol_error).encode('utf-8')))
175
184
                self._send_response(failure)
176
185
                return
177
 
            except Exception, exception:
 
186
            except Exception as exception:
178
187
                # everything else: pass to client, flush, and quit
179
188
                log_exception_quietly()
180
189
                self._send_response(request.FailedSmartServerResponse(
181
 
                    ('error', str(exception))))
 
190
                    (b'error', str(exception).encode('utf-8'))))
182
191
                return
183
192
 
184
193
        if self._has_dispatched:
186
195
                # nothing to do.XXX: this routine should be a single state
187
196
                # machine too.
188
197
                self.unused_data += self.in_buffer
189
 
                self.in_buffer = ''
 
198
                self.in_buffer = b''
190
199
                return
191
200
            if self._body_decoder is None:
192
201
                self._body_decoder = LengthPrefixedBodyDecoder()
201
210
            if self.request.response is not None:
202
211
                self._send_response(self.request.response)
203
212
                self.unused_data = self.in_buffer
204
 
                self.in_buffer = ''
 
213
                self.in_buffer = b''
205
214
            else:
206
215
                if self.request.finished_reading:
207
216
                    raise AssertionError(
218
227
        self._write_success_or_failure_prefix(response)
219
228
        self._write_func(_encode_tuple(args))
220
229
        if body is not None:
221
 
            if not isinstance(body, str):
 
230
            if not isinstance(body, bytes):
222
231
                raise ValueError(body)
223
 
            bytes = self._encode_bulk_data(body)
224
 
            self._write_func(bytes)
 
232
            data = self._encode_bulk_data(body)
 
233
            self._write_func(data)
225
234
 
226
235
    def _write_protocol_version(self):
227
236
        """Write any prefixes this protocol requires.
258
267
    def _write_success_or_failure_prefix(self, response):
259
268
        """Write the protocol specific success/failure prefix."""
260
269
        if response.is_successful():
261
 
            self._write_func('success\n')
 
270
            self._write_func(b'success\n')
262
271
        else:
263
 
            self._write_func('failed\n')
 
272
            self._write_func(b'failed\n')
264
273
 
265
274
    def _write_protocol_version(self):
266
275
        r"""Write any prefixes this protocol requires.
278
287
        self._write_success_or_failure_prefix(response)
279
288
        self._write_func(_encode_tuple(response.args))
280
289
        if response.body is not None:
281
 
            if not isinstance(response.body, str):
282
 
                raise AssertionError('body must be a str')
 
290
            if not isinstance(response.body, bytes):
 
291
                raise AssertionError('body must be bytes')
283
292
            if not (response.body_stream is None):
284
293
                raise AssertionError(
285
294
                    'body_stream and body cannot both be set')
286
 
            bytes = self._encode_bulk_data(response.body)
287
 
            self._write_func(bytes)
 
295
            data = self._encode_bulk_data(response.body)
 
296
            self._write_func(data)
288
297
        elif response.body_stream is not None:
289
298
            _send_stream(response.body_stream, self._write_func)
290
299
 
291
300
 
292
301
def _send_stream(stream, write_func):
293
 
    write_func('chunked\n')
 
302
    write_func(b'chunked\n')
294
303
    _send_chunks(stream, write_func)
295
 
    write_func('END\n')
 
304
    write_func(b'END\n')
296
305
 
297
306
 
298
307
def _send_chunks(stream, write_func):
299
308
    for chunk in stream:
300
 
        if isinstance(chunk, str):
301
 
            bytes = "%x\n%s" % (len(chunk), chunk)
302
 
            write_func(bytes)
 
309
        if isinstance(chunk, bytes):
 
310
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
 
311
            write_func(data)
303
312
        elif isinstance(chunk, request.FailedSmartServerResponse):
304
 
            write_func('ERR\n')
 
313
            write_func(b'ERR\n')
305
314
            _send_chunks(chunk.args, write_func)
306
315
            return
307
316
        else:
339
348
        self.finished_reading = False
340
349
        self._in_buffer_list = []
341
350
        self._in_buffer_len = 0
342
 
        self.unused_data = ''
 
351
        self.unused_data = b''
343
352
        self.bytes_left = None
344
353
        self._number_needed_bytes = None
345
354
 
346
355
    def _get_in_buffer(self):
347
356
        if len(self._in_buffer_list) == 1:
348
357
            return self._in_buffer_list[0]
349
 
        in_buffer = ''.join(self._in_buffer_list)
 
358
        in_buffer = b''.join(self._in_buffer_list)
350
359
        if len(in_buffer) != self._in_buffer_len:
351
360
            raise AssertionError(
352
361
                "Length of buffer did not match expected value: %s != %s"
365
374
        # check if we can yield the bytes from just the first entry in our list
366
375
        if len(self._in_buffer_list) == 0:
367
376
            raise AssertionError('Callers must be sure we have buffered bytes'
368
 
                ' before calling _get_in_bytes')
 
377
                                 ' before calling _get_in_bytes')
369
378
        if len(self._in_buffer_list[0]) > count:
370
379
            return self._in_buffer_list[0][:count]
371
380
        # We can't yield it from the first buffer, so collapse all buffers, and
375
384
 
376
385
    def _set_in_buffer(self, new_buf):
377
386
        if new_buf is not None:
 
387
            if not isinstance(new_buf, bytes):
 
388
                raise TypeError(new_buf)
378
389
            self._in_buffer_list = [new_buf]
379
390
            self._in_buffer_len = len(new_buf)
380
391
        else:
381
392
            self._in_buffer_list = []
382
393
            self._in_buffer_len = 0
383
394
 
384
 
    def accept_bytes(self, bytes):
 
395
    def accept_bytes(self, new_buf):
385
396
        """Decode as much of bytes as possible.
386
397
 
387
 
        If 'bytes' contains too much data it will be appended to
 
398
        If 'new_buf' contains too much data it will be appended to
388
399
        self.unused_data.
389
400
 
390
401
        finished_reading will be set when no more data is required.  Further
391
402
        data will be appended to self.unused_data.
392
403
        """
 
404
        if not isinstance(new_buf, bytes):
 
405
            raise TypeError(new_buf)
393
406
        # accept_bytes is allowed to change the state
394
407
        self._number_needed_bytes = None
395
408
        # lsprof puts a very large amount of time on this specific call for
396
409
        # large readv arrays
397
 
        self._in_buffer_list.append(bytes)
398
 
        self._in_buffer_len += len(bytes)
 
410
        self._in_buffer_list.append(new_buf)
 
411
        self._in_buffer_len += len(new_buf)
399
412
        try:
400
413
            # Run the function for the current state.
401
414
            current_state = self.state_accept
408
421
                #     _NeedMoreBytes).
409
422
                current_state = self.state_accept
410
423
                self.state_accept()
411
 
        except _NeedMoreBytes, e:
 
424
        except _NeedMoreBytes as e:
412
425
            self._number_needed_bytes = e.count
413
426
 
414
427
 
423
436
        _StatefulDecoder.__init__(self)
424
437
        self.state_accept = self._state_accept_expecting_header
425
438
        self.chunk_in_progress = None
426
 
        self.chunks = collections.deque()
 
439
        self.chunks = deque()
427
440
        self.error = False
428
441
        self.error_in_progress = None
429
442
 
458
471
 
459
472
    def _extract_line(self):
460
473
        in_buf = self._get_in_buffer()
461
 
        pos = in_buf.find('\n')
 
474
        pos = in_buf.find(b'\n')
462
475
        if pos == -1:
463
476
            # We haven't read a complete line yet, so request more bytes before
464
477
            # we continue.
465
478
            raise _NeedMoreBytes(1)
466
479
        line = in_buf[:pos]
467
480
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
468
 
        self._set_in_buffer(in_buf[pos+1:])
 
481
        self._set_in_buffer(in_buf[pos + 1:])
469
482
        return line
470
483
 
471
484
    def _finished(self):
481
494
 
482
495
    def _state_accept_expecting_header(self):
483
496
        prefix = self._extract_line()
484
 
        if prefix == 'chunked':
 
497
        if prefix == b'chunked':
485
498
            self.state_accept = self._state_accept_expecting_length
486
499
        else:
487
500
            raise errors.SmartProtocolError(
489
502
 
490
503
    def _state_accept_expecting_length(self):
491
504
        prefix = self._extract_line()
492
 
        if prefix == 'ERR':
 
505
        if prefix == b'ERR':
493
506
            self.error = True
494
507
            self.error_in_progress = []
495
508
            self._state_accept_expecting_length()
496
509
            return
497
 
        elif prefix == 'END':
 
510
        elif prefix == b'END':
498
511
            # We've read the end-of-body marker.
499
512
            # Any further bytes are unused data, including the bytes left in
500
513
            # the _in_buffer.
502
515
            return
503
516
        else:
504
517
            self.bytes_left = int(prefix, 16)
505
 
            self.chunk_in_progress = ''
 
518
            self.chunk_in_progress = b''
506
519
            self.state_accept = self._state_accept_reading_chunk
507
520
 
508
521
    def _state_accept_reading_chunk(self):
533
546
        _StatefulDecoder.__init__(self)
534
547
        self.state_accept = self._state_accept_expecting_length
535
548
        self.state_read = self._state_read_no_data
536
 
        self._body = ''
537
 
        self._trailer_buffer = ''
 
549
        self._body = b''
 
550
        self._trailer_buffer = b''
538
551
 
539
552
    def next_read_size(self):
540
553
        if self.bytes_left is not None:
558
571
 
559
572
    def _state_accept_expecting_length(self):
560
573
        in_buf = self._get_in_buffer()
561
 
        pos = in_buf.find('\n')
 
574
        pos = in_buf.find(b'\n')
562
575
        if pos == -1:
563
576
            return
564
577
        self.bytes_left = int(in_buf[:pos])
565
 
        self._set_in_buffer(in_buf[pos+1:])
 
578
        self._set_in_buffer(in_buf[pos + 1:])
566
579
        self.state_accept = self._state_accept_reading_body
567
580
        self.state_read = self._state_read_body_buffer
568
581
 
584
597
        self._set_in_buffer(None)
585
598
        # TODO: what if the trailer does not match "done\n"?  Should this raise
586
599
        # a ProtocolViolation exception?
587
 
        if self._trailer_buffer.startswith('done\n'):
588
 
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
600
        if self._trailer_buffer.startswith(b'done\n'):
 
601
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
589
602
            self.state_accept = self._state_accept_reading_unused
590
603
            self.finished_reading = True
591
604
 
594
607
        self._set_in_buffer(None)
595
608
 
596
609
    def _state_read_no_data(self):
597
 
        return ''
 
610
        return b''
598
611
 
599
612
    def _state_read_body_buffer(self):
600
613
        result = self._body
601
 
        self._body = ''
 
614
        self._body = b''
602
615
        return result
603
616
 
604
617
 
626
639
            mutter('hpss call:   %s', repr(args)[1:-1])
627
640
            if getattr(self._request._medium, 'base', None) is not None:
628
641
                mutter('             (to %s)', self._request._medium.base)
629
 
            self._request_start_time = osutils.timer_func()
 
642
            self._request_start_time = osutils.perf_counter()
630
643
        self._write_args(args)
631
644
        self._request.finished_writing()
632
645
        self._last_verb = args[0]
639
652
        if 'hpss' in debug.debug_flags:
640
653
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
641
654
            if getattr(self._request._medium, '_path', None) is not None:
642
 
                mutter('                  (to %s)', self._request._medium._path)
 
655
                mutter('                  (to %s)',
 
656
                       self._request._medium._path)
643
657
            mutter('              %d bytes', len(body))
644
 
            self._request_start_time = osutils.timer_func()
 
658
            self._request_start_time = osutils.perf_counter()
645
659
            if 'hpssdetail' in debug.debug_flags:
646
660
                mutter('hpss body content: %s', body)
647
661
        self._write_args(args)
654
668
        """Make a remote call with a readv array.
655
669
 
656
670
        The body is encoded with one line per readv offset pair. The numbers in
657
 
        each pair are separated by a comma, and no trailing \n is emitted.
 
671
        each pair are separated by a comma, and no trailing \\n is emitted.
658
672
        """
659
673
        if 'hpss' in debug.debug_flags:
660
674
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
661
675
            if getattr(self._request._medium, '_path', None) is not None:
662
 
                mutter('                  (to %s)', self._request._medium._path)
663
 
            self._request_start_time = osutils.timer_func()
 
676
                mutter('                  (to %s)',
 
677
                       self._request._medium._path)
 
678
            self._request_start_time = osutils.perf_counter()
664
679
        self._write_args(args)
665
680
        readv_bytes = self._serialise_offsets(body)
666
681
        bytes = self._encode_bulk_data(readv_bytes)
692
707
        if 'hpss' in debug.debug_flags:
693
708
            if self._request_start_time is not None:
694
709
                mutter('   result:   %6.3fs  %s',
695
 
                       osutils.timer_func() - self._request_start_time,
 
710
                       osutils.perf_counter() - self._request_start_time,
696
711
                       repr(result)[1:-1])
697
712
                self._request_start_time = None
698
713
            else:
718
733
        # returned in response to existing version 1 smart requests.  Responses
719
734
        # starting with these codes are always "failed" responses.
720
735
        v1_error_codes = [
721
 
            'norepository',
722
 
            'NoSuchFile',
723
 
            'FileExists',
724
 
            'DirectoryNotEmpty',
725
 
            'ShortReadvError',
726
 
            'UnicodeEncodeError',
727
 
            'UnicodeDecodeError',
728
 
            'ReadOnlyError',
729
 
            'nobranch',
730
 
            'NoSuchRevision',
731
 
            'nosuchrevision',
732
 
            'LockContention',
733
 
            'UnlockableTransport',
734
 
            'LockFailed',
735
 
            'TokenMismatch',
736
 
            'ReadError',
737
 
            'PermissionDenied',
 
736
            b'norepository',
 
737
            b'NoSuchFile',
 
738
            b'FileExists',
 
739
            b'DirectoryNotEmpty',
 
740
            b'ShortReadvError',
 
741
            b'UnicodeEncodeError',
 
742
            b'UnicodeDecodeError',
 
743
            b'ReadOnlyError',
 
744
            b'nobranch',
 
745
            b'NoSuchRevision',
 
746
            b'nosuchrevision',
 
747
            b'LockContention',
 
748
            b'UnlockableTransport',
 
749
            b'LockFailed',
 
750
            b'TokenMismatch',
 
751
            b'ReadError',
 
752
            b'PermissionDenied',
738
753
            ]
739
754
        if result_tuple[0] in v1_error_codes:
740
755
            self._request.finished_reading()
749
764
        :param verb: The verb used in that call.
750
765
        :raises: UnexpectedSmartServerResponse
751
766
        """
752
 
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
753
 
                "bad request '%s'" % self._last_verb) or
754
 
              result_tuple == ('error', "Generic bzr smart protocol error: "
755
 
                "bad request u'%s'" % self._last_verb)):
 
767
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
 
768
                             b"bad request '" + self._last_verb + b"'")
 
769
            or result_tuple == (b'error', b"Generic bzr smart protocol error: "
 
770
                                b"bad request u'%s'" % self._last_verb)):
756
771
            # The response will have no body, so we've finished reading.
757
772
            self._request.finished_reading()
758
773
            raise errors.UnknownSmartMethod(self._last_verb)
769
784
 
770
785
        while not _body_decoder.finished_reading:
771
786
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
772
 
            if bytes == '':
 
787
            if bytes == b'':
773
788
                # end of file encountered reading from server
774
789
                raise errors.ConnectionReset(
775
790
                    "Connection lost while reading response body.")
776
791
            _body_decoder.accept_bytes(bytes)
777
792
        self._request.finished_reading()
778
 
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
793
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
779
794
        # XXX: TODO check the trailer result.
780
795
        if 'hpss' in debug.debug_flags:
781
796
            mutter('              %d body bytes read',
788
803
 
789
804
    def query_version(self):
790
805
        """Return protocol version number of the server."""
791
 
        self.call('hello')
 
806
        self.call(b'hello')
792
807
        resp = self.read_response_tuple()
793
 
        if resp == ('ok', '1'):
 
808
        if resp == (b'ok', b'1'):
794
809
            return 1
795
 
        elif resp == ('ok', '2'):
 
810
        elif resp == (b'ok', b'2'):
796
811
            return 2
797
812
        else:
798
813
            raise errors.SmartProtocolError("bad response %r" % (resp,))
830
845
        response_status = self._request.read_line()
831
846
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
832
847
        self._response_is_unknown_method(result)
833
 
        if response_status == 'success\n':
 
848
        if response_status == b'success\n':
834
849
            self.response_status = True
835
850
            if not expect_body:
836
851
                self._request.finished_reading()
837
852
            return result
838
 
        elif response_status == 'failed\n':
 
853
        elif response_status == b'failed\n':
839
854
            self.response_status = False
840
855
            self._request.finished_reading()
841
856
            raise errors.ErrorFromSmartServer(result)
858
873
        _body_decoder = ChunkedBodyDecoder()
859
874
        while not _body_decoder.finished_reading:
860
875
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
861
 
            if bytes == '':
 
876
            if bytes == b'':
862
877
                # end of file encountered reading from server
863
878
                raise errors.ConnectionReset(
864
879
                    "Connection lost while reading streamed body.")
865
880
            _body_decoder.accept_bytes(bytes)
866
881
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
867
 
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
 
882
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
868
883
                    mutter('              %d byte chunk read',
869
884
                           len(body_bytes))
870
885
                yield body_bytes
877
892
        backing_transport, commands=request.request_handlers,
878
893
        root_client_path=root_client_path, jail_root=jail_root)
879
894
    responder = ProtocolThreeResponder(write_func)
880
 
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
 
895
    message_handler = message.ConventionalRequestHandler(
 
896
        request_handler, responder)
881
897
    return ProtocolThreeDecoder(message_handler)
882
898
 
883
899
 
907
923
            _StatefulDecoder.accept_bytes(self, bytes)
908
924
        except KeyboardInterrupt:
909
925
            raise
910
 
        except errors.SmartMessageHandlerError, exception:
 
926
        except errors.SmartMessageHandlerError as exception:
911
927
            # We do *not* set self.decoding_failed here.  The message handler
912
928
            # has raised an error, but the decoder is still able to parse bytes
913
929
            # and determine when this message ends.
917
933
            # The state machine is ready to continue decoding, but the
918
934
            # exception has interrupted the loop that runs the state machine.
919
935
            # So we call accept_bytes again to restart it.
920
 
            self.accept_bytes('')
921
 
        except Exception, exception:
 
936
            self.accept_bytes(b'')
 
937
        except Exception as exception:
922
938
            # The decoder itself has raised an exception.  We cannot continue
923
939
            # decoding.
924
940
            self.decoding_failed = True
965
981
            # The buffer is empty
966
982
            raise _NeedMoreBytes(1)
967
983
        in_buf = self._get_in_buffer()
968
 
        one_byte = in_buf[0]
 
984
        one_byte = in_buf[0:1]
969
985
        self._set_in_buffer(in_buf[1:])
970
986
        return one_byte
971
987
 
992
1008
 
993
1009
    def _state_accept_expecting_headers(self):
994
1010
        decoded = self._extract_prefixed_bencoded_data()
995
 
        if type(decoded) is not dict:
 
1011
        if not isinstance(decoded, dict):
996
1012
            raise errors.SmartProtocolError(
997
1013
                'Header object %r is not a dict' % (decoded,))
998
1014
        self.state_accept = self._state_accept_expecting_message_part
1003
1019
 
1004
1020
    def _state_accept_expecting_message_part(self):
1005
1021
        message_part_kind = self._extract_single_byte()
1006
 
        if message_part_kind == 'o':
 
1022
        if message_part_kind == b'o':
1007
1023
            self.state_accept = self._state_accept_expecting_one_byte
1008
 
        elif message_part_kind == 's':
 
1024
        elif message_part_kind == b's':
1009
1025
            self.state_accept = self._state_accept_expecting_structure
1010
 
        elif message_part_kind == 'b':
 
1026
        elif message_part_kind == b'b':
1011
1027
            self.state_accept = self._state_accept_expecting_bytes
1012
 
        elif message_part_kind == 'e':
 
1028
        elif message_part_kind == b'e':
1013
1029
            self.done()
1014
1030
        else:
1015
1031
            raise errors.SmartProtocolError(
1073
1089
class _ProtocolThreeEncoder(object):
1074
1090
 
1075
1091
    response_marker = request_marker = MESSAGE_VERSION_THREE
1076
 
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
 
1092
    BUFFER_SIZE = 1024 * 1024  # 1 MiB buffer before flushing
1077
1093
 
1078
1094
    def __init__(self, write_func):
1079
1095
        self._buf = []
1081
1097
        self._real_write_func = write_func
1082
1098
 
1083
1099
    def _write_func(self, bytes):
1084
 
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085
 
        #       for total number of bytes to write, rather than buffer based on
1086
 
        #       the number of write() calls
1087
1100
        # TODO: Another possibility would be to turn this into an async model.
1088
1101
        #       Where we let another thread know that we have some bytes if
1089
1102
        #       they want it, but we don't actually block for it
1096
1109
 
1097
1110
    def flush(self):
1098
1111
        if self._buf:
1099
 
            self._real_write_func(''.join(self._buf))
 
1112
            self._real_write_func(b''.join(self._buf))
1100
1113
            del self._buf[:]
1101
1114
            self._buf_len = 0
1102
1115
 
1104
1117
        """Serialise a readv offset list."""
1105
1118
        txt = []
1106
1119
        for start, length in offsets:
1107
 
            txt.append('%d,%d' % (start, length))
1108
 
        return '\n'.join(txt)
 
1120
            txt.append(b'%d,%d' % (start, length))
 
1121
        return b'\n'.join(txt)
1109
1122
 
1110
1123
    def _write_protocol_version(self):
1111
1124
        self._write_func(MESSAGE_VERSION_THREE)
1119
1132
        self._write_prefixed_bencode(headers)
1120
1133
 
1121
1134
    def _write_structure(self, args):
1122
 
        self._write_func('s')
 
1135
        self._write_func(b's')
1123
1136
        utf8_args = []
1124
1137
        for arg in args:
1125
 
            if type(arg) is unicode:
 
1138
            if isinstance(arg, text_type):
1126
1139
                utf8_args.append(arg.encode('utf8'))
1127
1140
            else:
1128
1141
                utf8_args.append(arg)
1129
1142
        self._write_prefixed_bencode(utf8_args)
1130
1143
 
1131
1144
    def _write_end(self):
1132
 
        self._write_func('e')
 
1145
        self._write_func(b'e')
1133
1146
        self.flush()
1134
1147
 
1135
1148
    def _write_prefixed_body(self, bytes):
1136
 
        self._write_func('b')
 
1149
        self._write_func(b'b')
1137
1150
        self._write_func(struct.pack('!L', len(bytes)))
1138
1151
        self._write_func(bytes)
1139
1152
 
1140
1153
    def _write_chunked_body_start(self):
1141
 
        self._write_func('oC')
 
1154
        self._write_func(b'oC')
1142
1155
 
1143
1156
    def _write_error_status(self):
1144
 
        self._write_func('oE')
 
1157
        self._write_func(b'oE')
1145
1158
 
1146
1159
    def _write_success_status(self):
1147
 
        self._write_func('oS')
 
1160
        self._write_func(b'oS')
1148
1161
 
1149
1162
 
1150
1163
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1152
1165
    def __init__(self, write_func):
1153
1166
        _ProtocolThreeEncoder.__init__(self, write_func)
1154
1167
        self.response_sent = False
1155
 
        self._headers = {'Software version': bzrlib.__version__}
 
1168
        self._headers = {
 
1169
            b'Software version': breezy.__version__.encode('utf-8')}
1156
1170
        if 'hpss' in debug.debug_flags:
1157
 
            self._thread_id = thread.get_ident()
 
1171
            self._thread_id = _thread.get_ident()
1158
1172
            self._response_start_time = None
1159
1173
 
1160
1174
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1161
1175
        if self._response_start_time is None:
1162
 
            self._response_start_time = osutils.timer_func()
 
1176
            self._response_start_time = osutils.perf_counter()
1163
1177
        if include_time:
1164
 
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
 
1178
            t = '%5.3fs ' % (osutils.perf_counter() - self._response_start_time)
1165
1179
        else:
1166
1180
            t = ''
1167
1181
        if extra_bytes is None:
1180
1194
                % (exception,))
1181
1195
        if isinstance(exception, errors.UnknownSmartMethod):
1182
1196
            failure = request.FailedSmartServerResponse(
1183
 
                ('UnknownMethod', exception.verb))
 
1197
                (b'UnknownMethod', exception.verb))
1184
1198
            self.send_response(failure)
1185
1199
            return
1186
1200
        if 'hpss' in debug.debug_flags:
1189
1203
        self._write_protocol_version()
1190
1204
        self._write_headers(self._headers)
1191
1205
        self._write_error_status()
1192
 
        self._write_structure(('error', str(exception)))
 
1206
        self._write_structure(
 
1207
            (b'error', str(exception).encode('utf-8', 'replace')))
1193
1208
        self._write_end()
1194
1209
 
1195
1210
    def send_response(self, response):
1231
1246
                    if first_chunk is None:
1232
1247
                        first_chunk = chunk
1233
1248
                    self._write_prefixed_body(chunk)
 
1249
                    self.flush()
1234
1250
                    if 'hpssdetail' in debug.debug_flags:
1235
1251
                        # Not worth timing separately, as _write_func is
1236
1252
                        # actually buffered
1273
1289
    iterator = iter(iterable)
1274
1290
    while True:
1275
1291
        try:
1276
 
            yield None, iterator.next()
 
1292
            yield None, next(iterator)
1277
1293
        except StopIteration:
1278
1294
            return
1279
1295
        except (KeyboardInterrupt, SystemExit):
1291
1307
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1292
1308
        self._medium_request = medium_request
1293
1309
        self._headers = {}
 
1310
        self.body_stream_started = None
1294
1311
 
1295
1312
    def set_headers(self, headers):
1296
1313
        self._headers = headers.copy()
1301
1318
            base = getattr(self._medium_request._medium, 'base', None)
1302
1319
            if base is not None:
1303
1320
                mutter('             (to %s)', base)
1304
 
            self._request_start_time = osutils.timer_func()
 
1321
            self._request_start_time = osutils.perf_counter()
1305
1322
        self._write_protocol_version()
1306
1323
        self._write_headers(self._headers)
1307
1324
        self._write_structure(args)
1319
1336
            if path is not None:
1320
1337
                mutter('                  (to %s)', path)
1321
1338
            mutter('              %d bytes', len(body))
1322
 
            self._request_start_time = osutils.timer_func()
 
1339
            self._request_start_time = osutils.perf_counter()
1323
1340
        self._write_protocol_version()
1324
1341
        self._write_headers(self._headers)
1325
1342
        self._write_structure(args)
1331
1348
        """Make a remote call with a readv array.
1332
1349
 
1333
1350
        The body is encoded with one line per readv offset pair. The numbers in
1334
 
        each pair are separated by a comma, and no trailing \n is emitted.
 
1351
        each pair are separated by a comma, and no trailing \\n is emitted.
1335
1352
        """
1336
1353
        if 'hpss' in debug.debug_flags:
1337
1354
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1338
1355
            path = getattr(self._medium_request._medium, '_path', None)
1339
1356
            if path is not None:
1340
1357
                mutter('                  (to %s)', path)
1341
 
            self._request_start_time = osutils.timer_func()
 
1358
            self._request_start_time = osutils.perf_counter()
1342
1359
        self._write_protocol_version()
1343
1360
        self._write_headers(self._headers)
1344
1361
        self._write_structure(args)
1355
1372
            path = getattr(self._medium_request._medium, '_path', None)
1356
1373
            if path is not None:
1357
1374
                mutter('                  (to %s)', path)
1358
 
            self._request_start_time = osutils.timer_func()
 
1375
            self._request_start_time = osutils.perf_counter()
 
1376
        self.body_stream_started = False
1359
1377
        self._write_protocol_version()
1360
1378
        self._write_headers(self._headers)
1361
1379
        self._write_structure(args)
1363
1381
        #       have finished sending the stream.  We would notice at the end
1364
1382
        #       anyway, but if the medium can deliver it early then it's good
1365
1383
        #       to short-circuit the whole request...
 
1384
        # Provoke any ConnectionReset failures before we start the body stream.
 
1385
        self.flush()
 
1386
        self.body_stream_started = True
1366
1387
        for exc_info, part in _iter_with_errors(stream):
1367
1388
            if exc_info is not None:
1368
1389
                # Iterating the stream failed.  Cleanly abort the request.
1369
1390
                self._write_error_status()
1370
1391
                # Currently the client unconditionally sends ('error',) as the
1371
1392
                # error args.
1372
 
                self._write_structure(('error',))
 
1393
                self._write_structure((b'error',))
1373
1394
                self._write_end()
1374
1395
                self._medium_request.finished_writing()
1375
 
                raise exc_info[0], exc_info[1], exc_info[2]
 
1396
                try:
 
1397
                    reraise(*exc_info)
 
1398
                finally:
 
1399
                    del exc_info
1376
1400
            else:
1377
1401
                self._write_prefixed_body(part)
1378
1402
                self.flush()
1379
1403
        self._write_end()
1380
1404
        self._medium_request.finished_writing()
1381