/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
from __future__ import absolute_import
22
 
 
23
21
import collections
 
22
from cStringIO import StringIO
24
23
import struct
25
24
import sys
26
 
try:
27
 
    import _thread
28
 
except ImportError:
29
 
    import thread as _thread
 
25
import thread
 
26
import threading
30
27
import time
31
28
 
32
 
import breezy
33
 
from ... import (
 
29
import bzrlib
 
30
from bzrlib import (
34
31
    debug,
35
32
    errors,
36
33
    osutils,
37
34
    )
38
 
from ...sixish import (
39
 
    BytesIO,
40
 
    reraise,
41
 
)
42
 
from . import message, request
43
 
from ...sixish import text_type
44
 
from ...trace import log_exception_quietly, mutter
45
 
from ...bencode import bdecode_as_tuple, bencode
 
35
from bzrlib.smart import message, request
 
36
from bzrlib.trace import log_exception_quietly, mutter
 
37
from bzrlib.bencode import bdecode_as_tuple, bencode
46
38
 
47
39
 
48
40
# Protocol version strings.  These are sent as prefixes of bzr requests and
49
41
# responses to identify the protocol version being used. (There are no version
50
42
# one strings because that version doesn't send any).
51
 
REQUEST_VERSION_TWO = b'bzr request 2\n'
52
 
RESPONSE_VERSION_TWO = b'bzr response 2\n'
 
43
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
44
RESPONSE_VERSION_TWO = 'bzr response 2\n'
53
45
 
54
 
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
 
46
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
55
47
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
56
48
 
57
49
 
61
53
 
62
54
 
63
55
def _decode_tuple(req_line):
64
 
    if req_line is None or req_line == b'':
 
56
    if req_line is None or req_line == '':
65
57
        return None
66
 
    if not req_line.endswith(b'\n'):
 
58
    if req_line[-1] != '\n':
67
59
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
68
 
    return tuple(req_line[:-1].split(b'\x01'))
 
60
    return tuple(req_line[:-1].split('\x01'))
69
61
 
70
62
 
71
63
def _encode_tuple(args):
72
64
    """Encode the tuple args to a bytestream."""
73
 
    for arg in args:
74
 
        if isinstance(arg, text_type):
75
 
            raise TypeError(args)
76
 
    return b'\x01'.join(args) + b'\n'
 
65
    joined = '\x01'.join(args) + '\n'
 
66
    if type(joined) is unicode:
 
67
        # XXX: We should fix things so this never happens!  -AJB, 20100304
 
68
        mutter('response args contain unicode, should be only bytes: %r',
 
69
               joined)
 
70
        joined = joined.encode('ascii')
 
71
    return joined
77
72
 
78
73
 
79
74
class Requester(object):
117
112
    # support multiple chunks?
118
113
    def _encode_bulk_data(self, body):
119
114
        """Encode body as a bulk data chunk."""
120
 
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
 
115
        return ''.join(('%d\n' % len(body), body, 'done\n'))
121
116
 
122
117
    def _serialise_offsets(self, offsets):
123
118
        """Serialise a readv offset list."""
124
119
        txt = []
125
120
        for start, length in offsets:
126
 
            txt.append(b'%d,%d' % (start, length))
127
 
        return b'\n'.join(txt)
 
121
            txt.append('%d,%d' % (start, length))
 
122
        return '\n'.join(txt)
128
123
 
129
124
 
130
125
class SmartServerRequestProtocolOne(SmartProtocolBase):
135
130
        self._backing_transport = backing_transport
136
131
        self._root_client_path = root_client_path
137
132
        self._jail_root = jail_root
138
 
        self.unused_data = b''
 
133
        self.unused_data = ''
139
134
        self._finished = False
140
 
        self.in_buffer = b''
 
135
        self.in_buffer = ''
141
136
        self._has_dispatched = False
142
137
        self.request = None
143
138
        self._body_decoder = None
144
139
        self._write_func = write_func
145
140
 
146
 
    def accept_bytes(self, data):
 
141
    def accept_bytes(self, bytes):
147
142
        """Take bytes, and advance the internal state machine appropriately.
148
143
 
149
 
        :param data: must be a byte string
 
144
        :param bytes: must be a byte string
150
145
        """
151
 
        if not isinstance(data, bytes):
152
 
            raise ValueError(data)
153
 
        self.in_buffer += data
 
146
        if not isinstance(bytes, str):
 
147
            raise ValueError(bytes)
 
148
        self.in_buffer += bytes
154
149
        if not self._has_dispatched:
155
 
            if b'\n' not in self.in_buffer:
 
150
            if '\n' not in self.in_buffer:
156
151
                # no command line yet
157
152
                return
158
153
            self._has_dispatched = True
159
154
            try:
160
 
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
161
 
                first_line += b'\n'
 
155
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
156
                first_line += '\n'
162
157
                req_args = _decode_tuple(first_line)
163
158
                self.request = request.SmartServerRequestHandler(
164
159
                    self._backing_transport, commands=request.request_handlers,
168
163
                if self.request.finished_reading:
169
164
                    # trivial request
170
165
                    self.unused_data = self.in_buffer
171
 
                    self.in_buffer = b''
 
166
                    self.in_buffer = ''
172
167
                    self._send_response(self.request.response)
173
168
            except KeyboardInterrupt:
174
169
                raise
175
 
            except errors.UnknownSmartMethod as err:
 
170
            except errors.UnknownSmartMethod, err:
176
171
                protocol_error = errors.SmartProtocolError(
177
 
                    "bad request '%s'" % (err.verb.decode('ascii'),))
 
172
                    "bad request %r" % (err.verb,))
178
173
                failure = request.FailedSmartServerResponse(
179
 
                    (b'error', str(protocol_error).encode('utf-8')))
 
174
                    ('error', str(protocol_error)))
180
175
                self._send_response(failure)
181
176
                return
182
 
            except Exception as exception:
 
177
            except Exception, exception:
183
178
                # everything else: pass to client, flush, and quit
184
179
                log_exception_quietly()
185
180
                self._send_response(request.FailedSmartServerResponse(
186
 
                    (b'error', str(exception).encode('utf-8'))))
 
181
                    ('error', str(exception))))
187
182
                return
188
183
 
189
184
        if self._has_dispatched:
191
186
                # nothing to do.XXX: this routine should be a single state
192
187
                # machine too.
193
188
                self.unused_data += self.in_buffer
194
 
                self.in_buffer = b''
 
189
                self.in_buffer = ''
195
190
                return
196
191
            if self._body_decoder is None:
197
192
                self._body_decoder = LengthPrefixedBodyDecoder()
206
201
            if self.request.response is not None:
207
202
                self._send_response(self.request.response)
208
203
                self.unused_data = self.in_buffer
209
 
                self.in_buffer = b''
 
204
                self.in_buffer = ''
210
205
            else:
211
206
                if self.request.finished_reading:
212
207
                    raise AssertionError(
223
218
        self._write_success_or_failure_prefix(response)
224
219
        self._write_func(_encode_tuple(args))
225
220
        if body is not None:
226
 
            if not isinstance(body, bytes):
 
221
            if not isinstance(body, str):
227
222
                raise ValueError(body)
228
 
            data = self._encode_bulk_data(body)
229
 
            self._write_func(data)
 
223
            bytes = self._encode_bulk_data(body)
 
224
            self._write_func(bytes)
230
225
 
231
226
    def _write_protocol_version(self):
232
227
        """Write any prefixes this protocol requires.
263
258
    def _write_success_or_failure_prefix(self, response):
264
259
        """Write the protocol specific success/failure prefix."""
265
260
        if response.is_successful():
266
 
            self._write_func(b'success\n')
 
261
            self._write_func('success\n')
267
262
        else:
268
 
            self._write_func(b'failed\n')
 
263
            self._write_func('failed\n')
269
264
 
270
265
    def _write_protocol_version(self):
271
266
        r"""Write any prefixes this protocol requires.
283
278
        self._write_success_or_failure_prefix(response)
284
279
        self._write_func(_encode_tuple(response.args))
285
280
        if response.body is not None:
286
 
            if not isinstance(response.body, bytes):
287
 
                raise AssertionError('body must be bytes')
 
281
            if not isinstance(response.body, str):
 
282
                raise AssertionError('body must be a str')
288
283
            if not (response.body_stream is None):
289
284
                raise AssertionError(
290
285
                    'body_stream and body cannot both be set')
291
 
            data = self._encode_bulk_data(response.body)
292
 
            self._write_func(data)
 
286
            bytes = self._encode_bulk_data(response.body)
 
287
            self._write_func(bytes)
293
288
        elif response.body_stream is not None:
294
289
            _send_stream(response.body_stream, self._write_func)
295
290
 
296
291
 
297
292
def _send_stream(stream, write_func):
298
 
    write_func(b'chunked\n')
 
293
    write_func('chunked\n')
299
294
    _send_chunks(stream, write_func)
300
 
    write_func(b'END\n')
 
295
    write_func('END\n')
301
296
 
302
297
 
303
298
def _send_chunks(stream, write_func):
304
299
    for chunk in stream:
305
 
        if isinstance(chunk, bytes):
306
 
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
307
 
            write_func(data)
 
300
        if isinstance(chunk, str):
 
301
            bytes = "%x\n%s" % (len(chunk), chunk)
 
302
            write_func(bytes)
308
303
        elif isinstance(chunk, request.FailedSmartServerResponse):
309
 
            write_func(b'ERR\n')
 
304
            write_func('ERR\n')
310
305
            _send_chunks(chunk.args, write_func)
311
306
            return
312
307
        else:
344
339
        self.finished_reading = False
345
340
        self._in_buffer_list = []
346
341
        self._in_buffer_len = 0
347
 
        self.unused_data = b''
 
342
        self.unused_data = ''
348
343
        self.bytes_left = None
349
344
        self._number_needed_bytes = None
350
345
 
351
346
    def _get_in_buffer(self):
352
347
        if len(self._in_buffer_list) == 1:
353
348
            return self._in_buffer_list[0]
354
 
        in_buffer = b''.join(self._in_buffer_list)
 
349
        in_buffer = ''.join(self._in_buffer_list)
355
350
        if len(in_buffer) != self._in_buffer_len:
356
351
            raise AssertionError(
357
352
                "Length of buffer did not match expected value: %s != %s"
380
375
 
381
376
    def _set_in_buffer(self, new_buf):
382
377
        if new_buf is not None:
383
 
            if not isinstance(new_buf, bytes):
384
 
                raise TypeError(new_buf)
385
378
            self._in_buffer_list = [new_buf]
386
379
            self._in_buffer_len = len(new_buf)
387
380
        else:
388
381
            self._in_buffer_list = []
389
382
            self._in_buffer_len = 0
390
383
 
391
 
    def accept_bytes(self, new_buf):
 
384
    def accept_bytes(self, bytes):
392
385
        """Decode as much of bytes as possible.
393
386
 
394
 
        If 'new_buf' contains too much data it will be appended to
 
387
        If 'bytes' contains too much data it will be appended to
395
388
        self.unused_data.
396
389
 
397
390
        finished_reading will be set when no more data is required.  Further
398
391
        data will be appended to self.unused_data.
399
392
        """
400
 
        if not isinstance(new_buf, bytes):
401
 
            raise TypeError(new_buf)
402
393
        # accept_bytes is allowed to change the state
403
394
        self._number_needed_bytes = None
404
395
        # lsprof puts a very large amount of time on this specific call for
405
396
        # large readv arrays
406
 
        self._in_buffer_list.append(new_buf)
407
 
        self._in_buffer_len += len(new_buf)
 
397
        self._in_buffer_list.append(bytes)
 
398
        self._in_buffer_len += len(bytes)
408
399
        try:
409
400
            # Run the function for the current state.
410
401
            current_state = self.state_accept
417
408
                #     _NeedMoreBytes).
418
409
                current_state = self.state_accept
419
410
                self.state_accept()
420
 
        except _NeedMoreBytes as e:
 
411
        except _NeedMoreBytes, e:
421
412
            self._number_needed_bytes = e.count
422
413
 
423
414
 
467
458
 
468
459
    def _extract_line(self):
469
460
        in_buf = self._get_in_buffer()
470
 
        pos = in_buf.find(b'\n')
 
461
        pos = in_buf.find('\n')
471
462
        if pos == -1:
472
463
            # We haven't read a complete line yet, so request more bytes before
473
464
            # we continue.
490
481
 
491
482
    def _state_accept_expecting_header(self):
492
483
        prefix = self._extract_line()
493
 
        if prefix == b'chunked':
 
484
        if prefix == 'chunked':
494
485
            self.state_accept = self._state_accept_expecting_length
495
486
        else:
496
487
            raise errors.SmartProtocolError(
498
489
 
499
490
    def _state_accept_expecting_length(self):
500
491
        prefix = self._extract_line()
501
 
        if prefix == b'ERR':
 
492
        if prefix == 'ERR':
502
493
            self.error = True
503
494
            self.error_in_progress = []
504
495
            self._state_accept_expecting_length()
505
496
            return
506
 
        elif prefix == b'END':
 
497
        elif prefix == 'END':
507
498
            # We've read the end-of-body marker.
508
499
            # Any further bytes are unused data, including the bytes left in
509
500
            # the _in_buffer.
511
502
            return
512
503
        else:
513
504
            self.bytes_left = int(prefix, 16)
514
 
            self.chunk_in_progress = b''
 
505
            self.chunk_in_progress = ''
515
506
            self.state_accept = self._state_accept_reading_chunk
516
507
 
517
508
    def _state_accept_reading_chunk(self):
542
533
        _StatefulDecoder.__init__(self)
543
534
        self.state_accept = self._state_accept_expecting_length
544
535
        self.state_read = self._state_read_no_data
545
 
        self._body = b''
546
 
        self._trailer_buffer = b''
 
536
        self._body = ''
 
537
        self._trailer_buffer = ''
547
538
 
548
539
    def next_read_size(self):
549
540
        if self.bytes_left is not None:
567
558
 
568
559
    def _state_accept_expecting_length(self):
569
560
        in_buf = self._get_in_buffer()
570
 
        pos = in_buf.find(b'\n')
 
561
        pos = in_buf.find('\n')
571
562
        if pos == -1:
572
563
            return
573
564
        self.bytes_left = int(in_buf[:pos])
593
584
        self._set_in_buffer(None)
594
585
        # TODO: what if the trailer does not match "done\n"?  Should this raise
595
586
        # a ProtocolViolation exception?
596
 
        if self._trailer_buffer.startswith(b'done\n'):
597
 
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
 
587
        if self._trailer_buffer.startswith('done\n'):
 
588
            self.unused_data = self._trailer_buffer[len('done\n'):]
598
589
            self.state_accept = self._state_accept_reading_unused
599
590
            self.finished_reading = True
600
591
 
603
594
        self._set_in_buffer(None)
604
595
 
605
596
    def _state_read_no_data(self):
606
 
        return b''
 
597
        return ''
607
598
 
608
599
    def _state_read_body_buffer(self):
609
600
        result = self._body
610
 
        self._body = b''
 
601
        self._body = ''
611
602
        return result
612
603
 
613
604
 
663
654
        """Make a remote call with a readv array.
664
655
 
665
656
        The body is encoded with one line per readv offset pair. The numbers in
666
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
657
        each pair are separated by a comma, and no trailing \n is emitted.
667
658
        """
668
659
        if 'hpss' in debug.debug_flags:
669
660
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
727
718
        # returned in response to existing version 1 smart requests.  Responses
728
719
        # starting with these codes are always "failed" responses.
729
720
        v1_error_codes = [
730
 
            b'norepository',
731
 
            b'NoSuchFile',
732
 
            b'FileExists',
733
 
            b'DirectoryNotEmpty',
734
 
            b'ShortReadvError',
735
 
            b'UnicodeEncodeError',
736
 
            b'UnicodeDecodeError',
737
 
            b'ReadOnlyError',
738
 
            b'nobranch',
739
 
            b'NoSuchRevision',
740
 
            b'nosuchrevision',
741
 
            b'LockContention',
742
 
            b'UnlockableTransport',
743
 
            b'LockFailed',
744
 
            b'TokenMismatch',
745
 
            b'ReadError',
746
 
            b'PermissionDenied',
 
721
            'norepository',
 
722
            'NoSuchFile',
 
723
            'FileExists',
 
724
            'DirectoryNotEmpty',
 
725
            'ShortReadvError',
 
726
            'UnicodeEncodeError',
 
727
            'UnicodeDecodeError',
 
728
            'ReadOnlyError',
 
729
            'nobranch',
 
730
            'NoSuchRevision',
 
731
            'nosuchrevision',
 
732
            'LockContention',
 
733
            'UnlockableTransport',
 
734
            'LockFailed',
 
735
            'TokenMismatch',
 
736
            'ReadError',
 
737
            'PermissionDenied',
747
738
            ]
748
739
        if result_tuple[0] in v1_error_codes:
749
740
            self._request.finished_reading()
758
749
        :param verb: The verb used in that call.
759
750
        :raises: UnexpectedSmartServerResponse
760
751
        """
761
 
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
762
 
                b"bad request '" + self._last_verb + b"'") or
763
 
              result_tuple == (b'error', b"Generic bzr smart protocol error: "
764
 
                b"bad request u'%s'" % self._last_verb)):
 
752
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
 
753
                "bad request '%s'" % self._last_verb) or
 
754
              result_tuple == ('error', "Generic bzr smart protocol error: "
 
755
                "bad request u'%s'" % self._last_verb)):
765
756
            # The response will have no body, so we've finished reading.
766
757
            self._request.finished_reading()
767
758
            raise errors.UnknownSmartMethod(self._last_verb)
778
769
 
779
770
        while not _body_decoder.finished_reading:
780
771
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
781
 
            if bytes == b'':
 
772
            if bytes == '':
782
773
                # end of file encountered reading from server
783
774
                raise errors.ConnectionReset(
784
775
                    "Connection lost while reading response body.")
785
776
            _body_decoder.accept_bytes(bytes)
786
777
        self._request.finished_reading()
787
 
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
 
778
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
788
779
        # XXX: TODO check the trailer result.
789
780
        if 'hpss' in debug.debug_flags:
790
781
            mutter('              %d body bytes read',
797
788
 
798
789
    def query_version(self):
799
790
        """Return protocol version number of the server."""
800
 
        self.call(b'hello')
 
791
        self.call('hello')
801
792
        resp = self.read_response_tuple()
802
 
        if resp == (b'ok', b'1'):
 
793
        if resp == ('ok', '1'):
803
794
            return 1
804
 
        elif resp == (b'ok', b'2'):
 
795
        elif resp == ('ok', '2'):
805
796
            return 2
806
797
        else:
807
798
            raise errors.SmartProtocolError("bad response %r" % (resp,))
839
830
        response_status = self._request.read_line()
840
831
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
841
832
        self._response_is_unknown_method(result)
842
 
        if response_status == b'success\n':
 
833
        if response_status == 'success\n':
843
834
            self.response_status = True
844
835
            if not expect_body:
845
836
                self._request.finished_reading()
846
837
            return result
847
 
        elif response_status == b'failed\n':
 
838
        elif response_status == 'failed\n':
848
839
            self.response_status = False
849
840
            self._request.finished_reading()
850
841
            raise errors.ErrorFromSmartServer(result)
867
858
        _body_decoder = ChunkedBodyDecoder()
868
859
        while not _body_decoder.finished_reading:
869
860
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
870
 
            if bytes == b'':
 
861
            if bytes == '':
871
862
                # end of file encountered reading from server
872
863
                raise errors.ConnectionReset(
873
864
                    "Connection lost while reading streamed body.")
874
865
            _body_decoder.accept_bytes(bytes)
875
866
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
876
 
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
 
867
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
877
868
                    mutter('              %d byte chunk read',
878
869
                           len(body_bytes))
879
870
                yield body_bytes
916
907
            _StatefulDecoder.accept_bytes(self, bytes)
917
908
        except KeyboardInterrupt:
918
909
            raise
919
 
        except errors.SmartMessageHandlerError as exception:
 
910
        except errors.SmartMessageHandlerError, exception:
920
911
            # We do *not* set self.decoding_failed here.  The message handler
921
912
            # has raised an error, but the decoder is still able to parse bytes
922
913
            # and determine when this message ends.
926
917
            # The state machine is ready to continue decoding, but the
927
918
            # exception has interrupted the loop that runs the state machine.
928
919
            # So we call accept_bytes again to restart it.
929
 
            self.accept_bytes(b'')
930
 
        except Exception as exception:
 
920
            self.accept_bytes('')
 
921
        except Exception, exception:
931
922
            # The decoder itself has raised an exception.  We cannot continue
932
923
            # decoding.
933
924
            self.decoding_failed = True
974
965
            # The buffer is empty
975
966
            raise _NeedMoreBytes(1)
976
967
        in_buf = self._get_in_buffer()
977
 
        one_byte = in_buf[0:1]
 
968
        one_byte = in_buf[0]
978
969
        self._set_in_buffer(in_buf[1:])
979
970
        return one_byte
980
971
 
1001
992
 
1002
993
    def _state_accept_expecting_headers(self):
1003
994
        decoded = self._extract_prefixed_bencoded_data()
1004
 
        if not isinstance(decoded, dict):
 
995
        if type(decoded) is not dict:
1005
996
            raise errors.SmartProtocolError(
1006
997
                'Header object %r is not a dict' % (decoded,))
1007
998
        self.state_accept = self._state_accept_expecting_message_part
1012
1003
 
1013
1004
    def _state_accept_expecting_message_part(self):
1014
1005
        message_part_kind = self._extract_single_byte()
1015
 
        if message_part_kind == b'o':
 
1006
        if message_part_kind == 'o':
1016
1007
            self.state_accept = self._state_accept_expecting_one_byte
1017
 
        elif message_part_kind == b's':
 
1008
        elif message_part_kind == 's':
1018
1009
            self.state_accept = self._state_accept_expecting_structure
1019
 
        elif message_part_kind == b'b':
 
1010
        elif message_part_kind == 'b':
1020
1011
            self.state_accept = self._state_accept_expecting_bytes
1021
 
        elif message_part_kind == b'e':
 
1012
        elif message_part_kind == 'e':
1022
1013
            self.done()
1023
1014
        else:
1024
1015
            raise errors.SmartProtocolError(
1090
1081
        self._real_write_func = write_func
1091
1082
 
1092
1083
    def _write_func(self, bytes):
 
1084
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1085
        #       for total number of bytes to write, rather than buffer based on
 
1086
        #       the number of write() calls
1093
1087
        # TODO: Another possibility would be to turn this into an async model.
1094
1088
        #       Where we let another thread know that we have some bytes if
1095
1089
        #       they want it, but we don't actually block for it
1102
1096
 
1103
1097
    def flush(self):
1104
1098
        if self._buf:
1105
 
            self._real_write_func(b''.join(self._buf))
 
1099
            self._real_write_func(''.join(self._buf))
1106
1100
            del self._buf[:]
1107
1101
            self._buf_len = 0
1108
1102
 
1110
1104
        """Serialise a readv offset list."""
1111
1105
        txt = []
1112
1106
        for start, length in offsets:
1113
 
            txt.append(b'%d,%d' % (start, length))
1114
 
        return b'\n'.join(txt)
 
1107
            txt.append('%d,%d' % (start, length))
 
1108
        return '\n'.join(txt)
1115
1109
 
1116
1110
    def _write_protocol_version(self):
1117
1111
        self._write_func(MESSAGE_VERSION_THREE)
1125
1119
        self._write_prefixed_bencode(headers)
1126
1120
 
1127
1121
    def _write_structure(self, args):
1128
 
        self._write_func(b's')
 
1122
        self._write_func('s')
1129
1123
        utf8_args = []
1130
1124
        for arg in args:
1131
 
            if isinstance(arg, text_type):
 
1125
            if type(arg) is unicode:
1132
1126
                utf8_args.append(arg.encode('utf8'))
1133
1127
            else:
1134
1128
                utf8_args.append(arg)
1135
1129
        self._write_prefixed_bencode(utf8_args)
1136
1130
 
1137
1131
    def _write_end(self):
1138
 
        self._write_func(b'e')
 
1132
        self._write_func('e')
1139
1133
        self.flush()
1140
1134
 
1141
1135
    def _write_prefixed_body(self, bytes):
1142
 
        self._write_func(b'b')
 
1136
        self._write_func('b')
1143
1137
        self._write_func(struct.pack('!L', len(bytes)))
1144
1138
        self._write_func(bytes)
1145
1139
 
1146
1140
    def _write_chunked_body_start(self):
1147
 
        self._write_func(b'oC')
 
1141
        self._write_func('oC')
1148
1142
 
1149
1143
    def _write_error_status(self):
1150
 
        self._write_func(b'oE')
 
1144
        self._write_func('oE')
1151
1145
 
1152
1146
    def _write_success_status(self):
1153
 
        self._write_func(b'oS')
 
1147
        self._write_func('oS')
1154
1148
 
1155
1149
 
1156
1150
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1158
1152
    def __init__(self, write_func):
1159
1153
        _ProtocolThreeEncoder.__init__(self, write_func)
1160
1154
        self.response_sent = False
1161
 
        self._headers = {
1162
 
                b'Software version': breezy.__version__.encode('utf-8')}
 
1155
        self._headers = {'Software version': bzrlib.__version__}
1163
1156
        if 'hpss' in debug.debug_flags:
1164
 
            self._thread_id = _thread.get_ident()
 
1157
            self._thread_id = thread.get_ident()
1165
1158
            self._response_start_time = None
1166
1159
 
1167
1160
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1187
1180
                % (exception,))
1188
1181
        if isinstance(exception, errors.UnknownSmartMethod):
1189
1182
            failure = request.FailedSmartServerResponse(
1190
 
                (b'UnknownMethod', exception.verb))
 
1183
                ('UnknownMethod', exception.verb))
1191
1184
            self.send_response(failure)
1192
1185
            return
1193
1186
        if 'hpss' in debug.debug_flags:
1196
1189
        self._write_protocol_version()
1197
1190
        self._write_headers(self._headers)
1198
1191
        self._write_error_status()
1199
 
        self._write_structure((b'error', str(exception).encode('utf-8', 'replace')))
 
1192
        self._write_structure(('error', str(exception)))
1200
1193
        self._write_end()
1201
1194
 
1202
1195
    def send_response(self, response):
1238
1231
                    if first_chunk is None:
1239
1232
                        first_chunk = chunk
1240
1233
                    self._write_prefixed_body(chunk)
1241
 
                    self.flush()
1242
1234
                    if 'hpssdetail' in debug.debug_flags:
1243
1235
                        # Not worth timing separately, as _write_func is
1244
1236
                        # actually buffered
1281
1273
    iterator = iter(iterable)
1282
1274
    while True:
1283
1275
        try:
1284
 
            yield None, next(iterator)
 
1276
            yield None, iterator.next()
1285
1277
        except StopIteration:
1286
1278
            return
1287
1279
        except (KeyboardInterrupt, SystemExit):
1299
1291
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1300
1292
        self._medium_request = medium_request
1301
1293
        self._headers = {}
1302
 
        self.body_stream_started = None
1303
1294
 
1304
1295
    def set_headers(self, headers):
1305
1296
        self._headers = headers.copy()
1340
1331
        """Make a remote call with a readv array.
1341
1332
 
1342
1333
        The body is encoded with one line per readv offset pair. The numbers in
1343
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
1334
        each pair are separated by a comma, and no trailing \n is emitted.
1344
1335
        """
1345
1336
        if 'hpss' in debug.debug_flags:
1346
1337
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1365
1356
            if path is not None:
1366
1357
                mutter('                  (to %s)', path)
1367
1358
            self._request_start_time = osutils.timer_func()
1368
 
        self.body_stream_started = False
1369
1359
        self._write_protocol_version()
1370
1360
        self._write_headers(self._headers)
1371
1361
        self._write_structure(args)
1373
1363
        #       have finished sending the stream.  We would notice at the end
1374
1364
        #       anyway, but if the medium can deliver it early then it's good
1375
1365
        #       to short-circuit the whole request...
1376
 
        # Provoke any ConnectionReset failures before we start the body stream.
1377
 
        self.flush()
1378
 
        self.body_stream_started = True
1379
1366
        for exc_info, part in _iter_with_errors(stream):
1380
1367
            if exc_info is not None:
1381
1368
                # Iterating the stream failed.  Cleanly abort the request.
1382
1369
                self._write_error_status()
1383
1370
                # Currently the client unconditionally sends ('error',) as the
1384
1371
                # error args.
1385
 
                self._write_structure((b'error',))
 
1372
                self._write_structure(('error',))
1386
1373
                self._write_end()
1387
1374
                self._medium_request.finished_writing()
1388
 
                try:
1389
 
                    reraise(*exc_info)
1390
 
                finally:
1391
 
                    del exc_info
 
1375
                raise exc_info[0], exc_info[1], exc_info[2]
1392
1376
            else:
1393
1377
                self._write_prefixed_body(part)
1394
1378
                self.flush()