/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/protocol.py

  • Committer: Jelmer Vernooij
  • Date: 2018-08-26 02:01:46 UTC
  • mto: This revision was merged to the branch mainline in revision 7087.
  • Revision ID: jelmer@jelmer.uk-20180826020146-owq7fxsr6ermorlh
Fix remaining warnings on Python 3.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
 
21
from __future__ import absolute_import
 
22
 
21
23
import collections
22
 
from cStringIO import StringIO
23
24
import struct
24
25
import sys
25
 
import thread
26
 
import threading
 
26
try:
 
27
    import _thread
 
28
except ImportError:
 
29
    import thread as _thread
27
30
import time
28
31
 
29
 
import bzrlib
30
 
from bzrlib import (
 
32
import breezy
 
33
from ... import (
31
34
    debug,
32
35
    errors,
33
36
    osutils,
34
37
    )
35
 
from bzrlib.smart import message, request
36
 
from bzrlib.trace import log_exception_quietly, mutter
37
 
from bzrlib.bencode import bdecode_as_tuple, bencode
 
38
from ...sixish import (
 
39
    BytesIO,
 
40
    reraise,
 
41
)
 
42
from . import message, request
 
43
from ...sixish import text_type
 
44
from ...trace import log_exception_quietly, mutter
 
45
from ...bencode import bdecode_as_tuple, bencode
38
46
 
39
47
 
40
48
# Protocol version strings.  These are sent as prefixes of bzr requests and
41
49
# responses to identify the protocol version being used. (There are no version
42
50
# one strings because that version doesn't send any).
43
 
REQUEST_VERSION_TWO = 'bzr request 2\n'
44
 
RESPONSE_VERSION_TWO = 'bzr response 2\n'
 
51
REQUEST_VERSION_TWO = b'bzr request 2\n'
 
52
RESPONSE_VERSION_TWO = b'bzr response 2\n'
45
53
 
46
 
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
 
54
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
47
55
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
48
56
 
49
57
 
53
61
 
54
62
 
55
63
def _decode_tuple(req_line):
56
 
    if req_line is None or req_line == '':
 
64
    if req_line is None or req_line == b'':
57
65
        return None
58
 
    if req_line[-1] != '\n':
 
66
    if not req_line.endswith(b'\n'):
59
67
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
60
 
    return tuple(req_line[:-1].split('\x01'))
 
68
    return tuple(req_line[:-1].split(b'\x01'))
61
69
 
62
70
 
63
71
def _encode_tuple(args):
64
72
    """Encode the tuple args to a bytestream."""
65
 
    joined = '\x01'.join(args) + '\n'
66
 
    if type(joined) is unicode:
67
 
        # XXX: We should fix things so this never happens!  -AJB, 20100304
68
 
        mutter('response args contain unicode, should be only bytes: %r',
69
 
               joined)
70
 
        joined = joined.encode('ascii')
71
 
    return joined
 
73
    for arg in args:
 
74
        if isinstance(arg, text_type):
 
75
            raise TypeError(args)
 
76
    return b'\x01'.join(args) + b'\n'
72
77
 
73
78
 
74
79
class Requester(object):
112
117
    # support multiple chunks?
113
118
    def _encode_bulk_data(self, body):
114
119
        """Encode body as a bulk data chunk."""
115
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
120
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
116
121
 
117
122
    def _serialise_offsets(self, offsets):
118
123
        """Serialise a readv offset list."""
119
124
        txt = []
120
125
        for start, length in offsets:
121
 
            txt.append('%d,%d' % (start, length))
122
 
        return '\n'.join(txt)
 
126
            txt.append(b'%d,%d' % (start, length))
 
127
        return b'\n'.join(txt)
123
128
 
124
129
 
125
130
class SmartServerRequestProtocolOne(SmartProtocolBase):
130
135
        self._backing_transport = backing_transport
131
136
        self._root_client_path = root_client_path
132
137
        self._jail_root = jail_root
133
 
        self.unused_data = ''
 
138
        self.unused_data = b''
134
139
        self._finished = False
135
 
        self.in_buffer = ''
 
140
        self.in_buffer = b''
136
141
        self._has_dispatched = False
137
142
        self.request = None
138
143
        self._body_decoder = None
139
144
        self._write_func = write_func
140
145
 
141
 
    def accept_bytes(self, bytes):
 
146
    def accept_bytes(self, data):
142
147
        """Take bytes, and advance the internal state machine appropriately.
143
148
 
144
 
        :param bytes: must be a byte string
 
149
        :param data: must be a byte string
145
150
        """
146
 
        if not isinstance(bytes, str):
147
 
            raise ValueError(bytes)
148
 
        self.in_buffer += bytes
 
151
        if not isinstance(data, bytes):
 
152
            raise ValueError(data)
 
153
        self.in_buffer += data
149
154
        if not self._has_dispatched:
150
 
            if '\n' not in self.in_buffer:
 
155
            if b'\n' not in self.in_buffer:
151
156
                # no command line yet
152
157
                return
153
158
            self._has_dispatched = True
154
159
            try:
155
 
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
156
 
                first_line += '\n'
 
160
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
 
161
                first_line += b'\n'
157
162
                req_args = _decode_tuple(first_line)
158
163
                self.request = request.SmartServerRequestHandler(
159
164
                    self._backing_transport, commands=request.request_handlers,
163
168
                if self.request.finished_reading:
164
169
                    # trivial request
165
170
                    self.unused_data = self.in_buffer
166
 
                    self.in_buffer = ''
 
171
                    self.in_buffer = b''
167
172
                    self._send_response(self.request.response)
168
173
            except KeyboardInterrupt:
169
174
                raise
170
 
            except errors.UnknownSmartMethod, err:
 
175
            except errors.UnknownSmartMethod as err:
171
176
                protocol_error = errors.SmartProtocolError(
172
 
                    "bad request %r" % (err.verb,))
 
177
                    "bad request '%s'" % (err.verb.decode('ascii'),))
173
178
                failure = request.FailedSmartServerResponse(
174
 
                    ('error', str(protocol_error)))
 
179
                    (b'error', str(protocol_error).encode('utf-8')))
175
180
                self._send_response(failure)
176
181
                return
177
 
            except Exception, exception:
 
182
            except Exception as exception:
178
183
                # everything else: pass to client, flush, and quit
179
184
                log_exception_quietly()
180
185
                self._send_response(request.FailedSmartServerResponse(
181
 
                    ('error', str(exception))))
 
186
                    (b'error', str(exception).encode('utf-8'))))
182
187
                return
183
188
 
184
189
        if self._has_dispatched:
186
191
                # nothing to do.XXX: this routine should be a single state
187
192
                # machine too.
188
193
                self.unused_data += self.in_buffer
189
 
                self.in_buffer = ''
 
194
                self.in_buffer = b''
190
195
                return
191
196
            if self._body_decoder is None:
192
197
                self._body_decoder = LengthPrefixedBodyDecoder()
201
206
            if self.request.response is not None:
202
207
                self._send_response(self.request.response)
203
208
                self.unused_data = self.in_buffer
204
 
                self.in_buffer = ''
 
209
                self.in_buffer = b''
205
210
            else:
206
211
                if self.request.finished_reading:
207
212
                    raise AssertionError(
218
223
        self._write_success_or_failure_prefix(response)
219
224
        self._write_func(_encode_tuple(args))
220
225
        if body is not None:
221
 
            if not isinstance(body, str):
 
226
            if not isinstance(body, bytes):
222
227
                raise ValueError(body)
223
 
            bytes = self._encode_bulk_data(body)
224
 
            self._write_func(bytes)
 
228
            data = self._encode_bulk_data(body)
 
229
            self._write_func(data)
225
230
 
226
231
    def _write_protocol_version(self):
227
232
        """Write any prefixes this protocol requires.
258
263
    def _write_success_or_failure_prefix(self, response):
259
264
        """Write the protocol specific success/failure prefix."""
260
265
        if response.is_successful():
261
 
            self._write_func('success\n')
 
266
            self._write_func(b'success\n')
262
267
        else:
263
 
            self._write_func('failed\n')
 
268
            self._write_func(b'failed\n')
264
269
 
265
270
    def _write_protocol_version(self):
266
271
        r"""Write any prefixes this protocol requires.
278
283
        self._write_success_or_failure_prefix(response)
279
284
        self._write_func(_encode_tuple(response.args))
280
285
        if response.body is not None:
281
 
            if not isinstance(response.body, str):
282
 
                raise AssertionError('body must be a str')
 
286
            if not isinstance(response.body, bytes):
 
287
                raise AssertionError('body must be bytes')
283
288
            if not (response.body_stream is None):
284
289
                raise AssertionError(
285
290
                    'body_stream and body cannot both be set')
286
 
            bytes = self._encode_bulk_data(response.body)
287
 
            self._write_func(bytes)
 
291
            data = self._encode_bulk_data(response.body)
 
292
            self._write_func(data)
288
293
        elif response.body_stream is not None:
289
294
            _send_stream(response.body_stream, self._write_func)
290
295
 
291
296
 
292
297
def _send_stream(stream, write_func):
293
 
    write_func('chunked\n')
 
298
    write_func(b'chunked\n')
294
299
    _send_chunks(stream, write_func)
295
 
    write_func('END\n')
 
300
    write_func(b'END\n')
296
301
 
297
302
 
298
303
def _send_chunks(stream, write_func):
299
304
    for chunk in stream:
300
 
        if isinstance(chunk, str):
301
 
            bytes = "%x\n%s" % (len(chunk), chunk)
302
 
            write_func(bytes)
 
305
        if isinstance(chunk, bytes):
 
306
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
 
307
            write_func(data)
303
308
        elif isinstance(chunk, request.FailedSmartServerResponse):
304
 
            write_func('ERR\n')
 
309
            write_func(b'ERR\n')
305
310
            _send_chunks(chunk.args, write_func)
306
311
            return
307
312
        else:
339
344
        self.finished_reading = False
340
345
        self._in_buffer_list = []
341
346
        self._in_buffer_len = 0
342
 
        self.unused_data = ''
 
347
        self.unused_data = b''
343
348
        self.bytes_left = None
344
349
        self._number_needed_bytes = None
345
350
 
346
351
    def _get_in_buffer(self):
347
352
        if len(self._in_buffer_list) == 1:
348
353
            return self._in_buffer_list[0]
349
 
        in_buffer = ''.join(self._in_buffer_list)
 
354
        in_buffer = b''.join(self._in_buffer_list)
350
355
        if len(in_buffer) != self._in_buffer_len:
351
356
            raise AssertionError(
352
357
                "Length of buffer did not match expected value: %s != %s"
375
380
 
376
381
    def _set_in_buffer(self, new_buf):
377
382
        if new_buf is not None:
 
383
            if not isinstance(new_buf, bytes):
 
384
                raise TypeError(new_buf)
378
385
            self._in_buffer_list = [new_buf]
379
386
            self._in_buffer_len = len(new_buf)
380
387
        else:
381
388
            self._in_buffer_list = []
382
389
            self._in_buffer_len = 0
383
390
 
384
 
    def accept_bytes(self, bytes):
 
391
    def accept_bytes(self, new_buf):
385
392
        """Decode as much of bytes as possible.
386
393
 
387
 
        If 'bytes' contains too much data it will be appended to
 
394
        If 'new_buf' contains too much data it will be appended to
388
395
        self.unused_data.
389
396
 
390
397
        finished_reading will be set when no more data is required.  Further
391
398
        data will be appended to self.unused_data.
392
399
        """
 
400
        if not isinstance(new_buf, bytes):
 
401
            raise TypeError(new_buf)
393
402
        # accept_bytes is allowed to change the state
394
403
        self._number_needed_bytes = None
395
404
        # lsprof puts a very large amount of time on this specific call for
396
405
        # large readv arrays
397
 
        self._in_buffer_list.append(bytes)
398
 
        self._in_buffer_len += len(bytes)
 
406
        self._in_buffer_list.append(new_buf)
 
407
        self._in_buffer_len += len(new_buf)
399
408
        try:
400
409
            # Run the function for the current state.
401
410
            current_state = self.state_accept
408
417
                #     _NeedMoreBytes).
409
418
                current_state = self.state_accept
410
419
                self.state_accept()
411
 
        except _NeedMoreBytes, e:
 
420
        except _NeedMoreBytes as e:
412
421
            self._number_needed_bytes = e.count
413
422
 
414
423
 
458
467
 
459
468
    def _extract_line(self):
460
469
        in_buf = self._get_in_buffer()
461
 
        pos = in_buf.find('\n')
 
470
        pos = in_buf.find(b'\n')
462
471
        if pos == -1:
463
472
            # We haven't read a complete line yet, so request more bytes before
464
473
            # we continue.
481
490
 
482
491
    def _state_accept_expecting_header(self):
483
492
        prefix = self._extract_line()
484
 
        if prefix == 'chunked':
 
493
        if prefix == b'chunked':
485
494
            self.state_accept = self._state_accept_expecting_length
486
495
        else:
487
496
            raise errors.SmartProtocolError(
489
498
 
490
499
    def _state_accept_expecting_length(self):
491
500
        prefix = self._extract_line()
492
 
        if prefix == 'ERR':
 
501
        if prefix == b'ERR':
493
502
            self.error = True
494
503
            self.error_in_progress = []
495
504
            self._state_accept_expecting_length()
496
505
            return
497
 
        elif prefix == 'END':
 
506
        elif prefix == b'END':
498
507
            # We've read the end-of-body marker.
499
508
            # Any further bytes are unused data, including the bytes left in
500
509
            # the _in_buffer.
502
511
            return
503
512
        else:
504
513
            self.bytes_left = int(prefix, 16)
505
 
            self.chunk_in_progress = ''
 
514
            self.chunk_in_progress = b''
506
515
            self.state_accept = self._state_accept_reading_chunk
507
516
 
508
517
    def _state_accept_reading_chunk(self):
533
542
        _StatefulDecoder.__init__(self)
534
543
        self.state_accept = self._state_accept_expecting_length
535
544
        self.state_read = self._state_read_no_data
536
 
        self._body = ''
537
 
        self._trailer_buffer = ''
 
545
        self._body = b''
 
546
        self._trailer_buffer = b''
538
547
 
539
548
    def next_read_size(self):
540
549
        if self.bytes_left is not None:
558
567
 
559
568
    def _state_accept_expecting_length(self):
560
569
        in_buf = self._get_in_buffer()
561
 
        pos = in_buf.find('\n')
 
570
        pos = in_buf.find(b'\n')
562
571
        if pos == -1:
563
572
            return
564
573
        self.bytes_left = int(in_buf[:pos])
584
593
        self._set_in_buffer(None)
585
594
        # TODO: what if the trailer does not match "done\n"?  Should this raise
586
595
        # a ProtocolViolation exception?
587
 
        if self._trailer_buffer.startswith('done\n'):
588
 
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
596
        if self._trailer_buffer.startswith(b'done\n'):
 
597
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
589
598
            self.state_accept = self._state_accept_reading_unused
590
599
            self.finished_reading = True
591
600
 
594
603
        self._set_in_buffer(None)
595
604
 
596
605
    def _state_read_no_data(self):
597
 
        return ''
 
606
        return b''
598
607
 
599
608
    def _state_read_body_buffer(self):
600
609
        result = self._body
601
 
        self._body = ''
 
610
        self._body = b''
602
611
        return result
603
612
 
604
613
 
654
663
        """Make a remote call with a readv array.
655
664
 
656
665
        The body is encoded with one line per readv offset pair. The numbers in
657
 
        each pair are separated by a comma, and no trailing \n is emitted.
 
666
        each pair are separated by a comma, and no trailing \\n is emitted.
658
667
        """
659
668
        if 'hpss' in debug.debug_flags:
660
669
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
718
727
        # returned in response to existing version 1 smart requests.  Responses
719
728
        # starting with these codes are always "failed" responses.
720
729
        v1_error_codes = [
721
 
            'norepository',
722
 
            'NoSuchFile',
723
 
            'FileExists',
724
 
            'DirectoryNotEmpty',
725
 
            'ShortReadvError',
726
 
            'UnicodeEncodeError',
727
 
            'UnicodeDecodeError',
728
 
            'ReadOnlyError',
729
 
            'nobranch',
730
 
            'NoSuchRevision',
731
 
            'nosuchrevision',
732
 
            'LockContention',
733
 
            'UnlockableTransport',
734
 
            'LockFailed',
735
 
            'TokenMismatch',
736
 
            'ReadError',
737
 
            'PermissionDenied',
 
730
            b'norepository',
 
731
            b'NoSuchFile',
 
732
            b'FileExists',
 
733
            b'DirectoryNotEmpty',
 
734
            b'ShortReadvError',
 
735
            b'UnicodeEncodeError',
 
736
            b'UnicodeDecodeError',
 
737
            b'ReadOnlyError',
 
738
            b'nobranch',
 
739
            b'NoSuchRevision',
 
740
            b'nosuchrevision',
 
741
            b'LockContention',
 
742
            b'UnlockableTransport',
 
743
            b'LockFailed',
 
744
            b'TokenMismatch',
 
745
            b'ReadError',
 
746
            b'PermissionDenied',
738
747
            ]
739
748
        if result_tuple[0] in v1_error_codes:
740
749
            self._request.finished_reading()
749
758
        :param verb: The verb used in that call.
750
759
        :raises: UnexpectedSmartServerResponse
751
760
        """
752
 
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
753
 
                "bad request '%s'" % self._last_verb) or
754
 
              result_tuple == ('error', "Generic bzr smart protocol error: "
755
 
                "bad request u'%s'" % self._last_verb)):
 
761
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
 
762
                b"bad request '" + self._last_verb + b"'") or
 
763
              result_tuple == (b'error', b"Generic bzr smart protocol error: "
 
764
                b"bad request u'%s'" % self._last_verb)):
756
765
            # The response will have no body, so we've finished reading.
757
766
            self._request.finished_reading()
758
767
            raise errors.UnknownSmartMethod(self._last_verb)
769
778
 
770
779
        while not _body_decoder.finished_reading:
771
780
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
772
 
            if bytes == '':
 
781
            if bytes == b'':
773
782
                # end of file encountered reading from server
774
783
                raise errors.ConnectionReset(
775
784
                    "Connection lost while reading response body.")
776
785
            _body_decoder.accept_bytes(bytes)
777
786
        self._request.finished_reading()
778
 
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
787
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
779
788
        # XXX: TODO check the trailer result.
780
789
        if 'hpss' in debug.debug_flags:
781
790
            mutter('              %d body bytes read',
788
797
 
789
798
    def query_version(self):
790
799
        """Return protocol version number of the server."""
791
 
        self.call('hello')
 
800
        self.call(b'hello')
792
801
        resp = self.read_response_tuple()
793
 
        if resp == ('ok', '1'):
 
802
        if resp == (b'ok', b'1'):
794
803
            return 1
795
 
        elif resp == ('ok', '2'):
 
804
        elif resp == (b'ok', b'2'):
796
805
            return 2
797
806
        else:
798
807
            raise errors.SmartProtocolError("bad response %r" % (resp,))
830
839
        response_status = self._request.read_line()
831
840
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
832
841
        self._response_is_unknown_method(result)
833
 
        if response_status == 'success\n':
 
842
        if response_status == b'success\n':
834
843
            self.response_status = True
835
844
            if not expect_body:
836
845
                self._request.finished_reading()
837
846
            return result
838
 
        elif response_status == 'failed\n':
 
847
        elif response_status == b'failed\n':
839
848
            self.response_status = False
840
849
            self._request.finished_reading()
841
850
            raise errors.ErrorFromSmartServer(result)
858
867
        _body_decoder = ChunkedBodyDecoder()
859
868
        while not _body_decoder.finished_reading:
860
869
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
861
 
            if bytes == '':
 
870
            if bytes == b'':
862
871
                # end of file encountered reading from server
863
872
                raise errors.ConnectionReset(
864
873
                    "Connection lost while reading streamed body.")
865
874
            _body_decoder.accept_bytes(bytes)
866
875
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
867
 
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
 
876
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
868
877
                    mutter('              %d byte chunk read',
869
878
                           len(body_bytes))
870
879
                yield body_bytes
907
916
            _StatefulDecoder.accept_bytes(self, bytes)
908
917
        except KeyboardInterrupt:
909
918
            raise
910
 
        except errors.SmartMessageHandlerError, exception:
 
919
        except errors.SmartMessageHandlerError as exception:
911
920
            # We do *not* set self.decoding_failed here.  The message handler
912
921
            # has raised an error, but the decoder is still able to parse bytes
913
922
            # and determine when this message ends.
917
926
            # The state machine is ready to continue decoding, but the
918
927
            # exception has interrupted the loop that runs the state machine.
919
928
            # So we call accept_bytes again to restart it.
920
 
            self.accept_bytes('')
921
 
        except Exception, exception:
 
929
            self.accept_bytes(b'')
 
930
        except Exception as exception:
922
931
            # The decoder itself has raised an exception.  We cannot continue
923
932
            # decoding.
924
933
            self.decoding_failed = True
965
974
            # The buffer is empty
966
975
            raise _NeedMoreBytes(1)
967
976
        in_buf = self._get_in_buffer()
968
 
        one_byte = in_buf[0]
 
977
        one_byte = in_buf[0:1]
969
978
        self._set_in_buffer(in_buf[1:])
970
979
        return one_byte
971
980
 
992
1001
 
993
1002
    def _state_accept_expecting_headers(self):
994
1003
        decoded = self._extract_prefixed_bencoded_data()
995
 
        if type(decoded) is not dict:
 
1004
        if not isinstance(decoded, dict):
996
1005
            raise errors.SmartProtocolError(
997
1006
                'Header object %r is not a dict' % (decoded,))
998
1007
        self.state_accept = self._state_accept_expecting_message_part
1003
1012
 
1004
1013
    def _state_accept_expecting_message_part(self):
1005
1014
        message_part_kind = self._extract_single_byte()
1006
 
        if message_part_kind == 'o':
 
1015
        if message_part_kind == b'o':
1007
1016
            self.state_accept = self._state_accept_expecting_one_byte
1008
 
        elif message_part_kind == 's':
 
1017
        elif message_part_kind == b's':
1009
1018
            self.state_accept = self._state_accept_expecting_structure
1010
 
        elif message_part_kind == 'b':
 
1019
        elif message_part_kind == b'b':
1011
1020
            self.state_accept = self._state_accept_expecting_bytes
1012
 
        elif message_part_kind == 'e':
 
1021
        elif message_part_kind == b'e':
1013
1022
            self.done()
1014
1023
        else:
1015
1024
            raise errors.SmartProtocolError(
1081
1090
        self._real_write_func = write_func
1082
1091
 
1083
1092
    def _write_func(self, bytes):
1084
 
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085
 
        #       for total number of bytes to write, rather than buffer based on
1086
 
        #       the number of write() calls
1087
1093
        # TODO: Another possibility would be to turn this into an async model.
1088
1094
        #       Where we let another thread know that we have some bytes if
1089
1095
        #       they want it, but we don't actually block for it
1096
1102
 
1097
1103
    def flush(self):
1098
1104
        if self._buf:
1099
 
            self._real_write_func(''.join(self._buf))
 
1105
            self._real_write_func(b''.join(self._buf))
1100
1106
            del self._buf[:]
1101
1107
            self._buf_len = 0
1102
1108
 
1104
1110
        """Serialise a readv offset list."""
1105
1111
        txt = []
1106
1112
        for start, length in offsets:
1107
 
            txt.append('%d,%d' % (start, length))
1108
 
        return '\n'.join(txt)
 
1113
            txt.append(b'%d,%d' % (start, length))
 
1114
        return b'\n'.join(txt)
1109
1115
 
1110
1116
    def _write_protocol_version(self):
1111
1117
        self._write_func(MESSAGE_VERSION_THREE)
1119
1125
        self._write_prefixed_bencode(headers)
1120
1126
 
1121
1127
    def _write_structure(self, args):
1122
 
        self._write_func('s')
 
1128
        self._write_func(b's')
1123
1129
        utf8_args = []
1124
1130
        for arg in args:
1125
 
            if type(arg) is unicode:
 
1131
            if isinstance(arg, text_type):
1126
1132
                utf8_args.append(arg.encode('utf8'))
1127
1133
            else:
1128
1134
                utf8_args.append(arg)
1129
1135
        self._write_prefixed_bencode(utf8_args)
1130
1136
 
1131
1137
    def _write_end(self):
1132
 
        self._write_func('e')
 
1138
        self._write_func(b'e')
1133
1139
        self.flush()
1134
1140
 
1135
1141
    def _write_prefixed_body(self, bytes):
1136
 
        self._write_func('b')
 
1142
        self._write_func(b'b')
1137
1143
        self._write_func(struct.pack('!L', len(bytes)))
1138
1144
        self._write_func(bytes)
1139
1145
 
1140
1146
    def _write_chunked_body_start(self):
1141
 
        self._write_func('oC')
 
1147
        self._write_func(b'oC')
1142
1148
 
1143
1149
    def _write_error_status(self):
1144
 
        self._write_func('oE')
 
1150
        self._write_func(b'oE')
1145
1151
 
1146
1152
    def _write_success_status(self):
1147
 
        self._write_func('oS')
 
1153
        self._write_func(b'oS')
1148
1154
 
1149
1155
 
1150
1156
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1152
1158
    def __init__(self, write_func):
1153
1159
        _ProtocolThreeEncoder.__init__(self, write_func)
1154
1160
        self.response_sent = False
1155
 
        self._headers = {'Software version': bzrlib.__version__}
 
1161
        self._headers = {
 
1162
                b'Software version': breezy.__version__.encode('utf-8')}
1156
1163
        if 'hpss' in debug.debug_flags:
1157
 
            self._thread_id = thread.get_ident()
 
1164
            self._thread_id = _thread.get_ident()
1158
1165
            self._response_start_time = None
1159
1166
 
1160
1167
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1180
1187
                % (exception,))
1181
1188
        if isinstance(exception, errors.UnknownSmartMethod):
1182
1189
            failure = request.FailedSmartServerResponse(
1183
 
                ('UnknownMethod', exception.verb))
 
1190
                (b'UnknownMethod', exception.verb))
1184
1191
            self.send_response(failure)
1185
1192
            return
1186
1193
        if 'hpss' in debug.debug_flags:
1189
1196
        self._write_protocol_version()
1190
1197
        self._write_headers(self._headers)
1191
1198
        self._write_error_status()
1192
 
        self._write_structure(('error', str(exception)))
 
1199
        self._write_structure((b'error', str(exception).encode('utf-8', 'replace')))
1193
1200
        self._write_end()
1194
1201
 
1195
1202
    def send_response(self, response):
1231
1238
                    if first_chunk is None:
1232
1239
                        first_chunk = chunk
1233
1240
                    self._write_prefixed_body(chunk)
 
1241
                    self.flush()
1234
1242
                    if 'hpssdetail' in debug.debug_flags:
1235
1243
                        # Not worth timing separately, as _write_func is
1236
1244
                        # actually buffered
1273
1281
    iterator = iter(iterable)
1274
1282
    while True:
1275
1283
        try:
1276
 
            yield None, iterator.next()
 
1284
            yield None, next(iterator)
1277
1285
        except StopIteration:
1278
1286
            return
1279
1287
        except (KeyboardInterrupt, SystemExit):
1291
1299
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1292
1300
        self._medium_request = medium_request
1293
1301
        self._headers = {}
 
1302
        self.body_stream_started = None
1294
1303
 
1295
1304
    def set_headers(self, headers):
1296
1305
        self._headers = headers.copy()
1331
1340
        """Make a remote call with a readv array.
1332
1341
 
1333
1342
        The body is encoded with one line per readv offset pair. The numbers in
1334
 
        each pair are separated by a comma, and no trailing \n is emitted.
 
1343
        each pair are separated by a comma, and no trailing \\n is emitted.
1335
1344
        """
1336
1345
        if 'hpss' in debug.debug_flags:
1337
1346
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1356
1365
            if path is not None:
1357
1366
                mutter('                  (to %s)', path)
1358
1367
            self._request_start_time = osutils.timer_func()
 
1368
        self.body_stream_started = False
1359
1369
        self._write_protocol_version()
1360
1370
        self._write_headers(self._headers)
1361
1371
        self._write_structure(args)
1363
1373
        #       have finished sending the stream.  We would notice at the end
1364
1374
        #       anyway, but if the medium can deliver it early then it's good
1365
1375
        #       to short-circuit the whole request...
 
1376
        # Provoke any ConnectionReset failures before we start the body stream.
 
1377
        self.flush()
 
1378
        self.body_stream_started = True
1366
1379
        for exc_info, part in _iter_with_errors(stream):
1367
1380
            if exc_info is not None:
1368
1381
                # Iterating the stream failed.  Cleanly abort the request.
1369
1382
                self._write_error_status()
1370
1383
                # Currently the client unconditionally sends ('error',) as the
1371
1384
                # error args.
1372
 
                self._write_structure(('error',))
 
1385
                self._write_structure((b'error',))
1373
1386
                self._write_end()
1374
1387
                self._medium_request.finished_writing()
1375
 
                raise exc_info[0], exc_info[1], exc_info[2]
 
1388
                try:
 
1389
                    reraise(*exc_info)
 
1390
                finally:
 
1391
                    del exc_info
1376
1392
            else:
1377
1393
                self._write_prefixed_body(part)
1378
1394
                self.flush()