/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
client and server.
19
19
"""
20
20
 
21
 
from __future__ import absolute_import
22
 
 
23
 
try:
24
 
    from collections.abc import deque
25
 
except ImportError:  # python < 3.7
26
 
    from collections import deque
27
 
 
 
21
import collections
 
22
from cStringIO import StringIO
28
23
import struct
29
24
import sys
30
 
try:
31
 
    import _thread
32
 
except ImportError:
33
 
    import thread as _thread
 
25
import thread
 
26
import threading
34
27
import time
35
28
 
36
 
import breezy
37
 
from ... import (
 
29
import bzrlib
 
30
from bzrlib import (
38
31
    debug,
39
32
    errors,
40
33
    osutils,
41
34
    )
42
 
from ...sixish import (
43
 
    BytesIO,
44
 
    reraise,
45
 
)
46
 
from . import message, request
47
 
from ...sixish import text_type
48
 
from ...trace import log_exception_quietly, mutter
49
 
from ...bencode import bdecode_as_tuple, bencode
 
35
from bzrlib.smart import message, request
 
36
from bzrlib.trace import log_exception_quietly, mutter
 
37
from bzrlib.bencode import bdecode_as_tuple, bencode
50
38
 
51
39
 
52
40
# Protocol version strings.  These are sent as prefixes of bzr requests and
53
41
# responses to identify the protocol version being used. (There are no version
54
42
# one strings because that version doesn't send any).
55
 
REQUEST_VERSION_TWO = b'bzr request 2\n'
56
 
RESPONSE_VERSION_TWO = b'bzr response 2\n'
 
43
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
44
RESPONSE_VERSION_TWO = 'bzr response 2\n'
57
45
 
58
 
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
 
46
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
59
47
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
60
48
 
61
49
 
65
53
 
66
54
 
67
55
def _decode_tuple(req_line):
68
 
    if req_line is None or req_line == b'':
 
56
    if req_line is None or req_line == '':
69
57
        return None
70
 
    if not req_line.endswith(b'\n'):
 
58
    if req_line[-1] != '\n':
71
59
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
72
 
    return tuple(req_line[:-1].split(b'\x01'))
 
60
    return tuple(req_line[:-1].split('\x01'))
73
61
 
74
62
 
75
63
def _encode_tuple(args):
76
64
    """Encode the tuple args to a bytestream."""
77
 
    for arg in args:
78
 
        if isinstance(arg, text_type):
79
 
            raise TypeError(args)
80
 
    return b'\x01'.join(args) + b'\n'
 
65
    joined = '\x01'.join(args) + '\n'
 
66
    if type(joined) is unicode:
 
67
        # XXX: We should fix things so this never happens!  -AJB, 20100304
 
68
        mutter('response args contain unicode, should be only bytes: %r',
 
69
               joined)
 
70
        joined = joined.encode('ascii')
 
71
    return joined
81
72
 
82
73
 
83
74
class Requester(object):
121
112
    # support multiple chunks?
122
113
    def _encode_bulk_data(self, body):
123
114
        """Encode body as a bulk data chunk."""
124
 
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
 
115
        return ''.join(('%d\n' % len(body), body, 'done\n'))
125
116
 
126
117
    def _serialise_offsets(self, offsets):
127
118
        """Serialise a readv offset list."""
128
119
        txt = []
129
120
        for start, length in offsets:
130
 
            txt.append(b'%d,%d' % (start, length))
131
 
        return b'\n'.join(txt)
 
121
            txt.append('%d,%d' % (start, length))
 
122
        return '\n'.join(txt)
132
123
 
133
124
 
134
125
class SmartServerRequestProtocolOne(SmartProtocolBase):
135
126
    """Server-side encoding and decoding logic for smart version 1."""
136
127
 
137
128
    def __init__(self, backing_transport, write_func, root_client_path='/',
138
 
                 jail_root=None):
 
129
            jail_root=None):
139
130
        self._backing_transport = backing_transport
140
131
        self._root_client_path = root_client_path
141
132
        self._jail_root = jail_root
142
 
        self.unused_data = b''
 
133
        self.unused_data = ''
143
134
        self._finished = False
144
 
        self.in_buffer = b''
 
135
        self.in_buffer = ''
145
136
        self._has_dispatched = False
146
137
        self.request = None
147
138
        self._body_decoder = None
148
139
        self._write_func = write_func
149
140
 
150
 
    def accept_bytes(self, data):
 
141
    def accept_bytes(self, bytes):
151
142
        """Take bytes, and advance the internal state machine appropriately.
152
143
 
153
 
        :param data: must be a byte string
 
144
        :param bytes: must be a byte string
154
145
        """
155
 
        if not isinstance(data, bytes):
156
 
            raise ValueError(data)
157
 
        self.in_buffer += data
 
146
        if not isinstance(bytes, str):
 
147
            raise ValueError(bytes)
 
148
        self.in_buffer += bytes
158
149
        if not self._has_dispatched:
159
 
            if b'\n' not in self.in_buffer:
 
150
            if '\n' not in self.in_buffer:
160
151
                # no command line yet
161
152
                return
162
153
            self._has_dispatched = True
163
154
            try:
164
 
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
165
 
                first_line += b'\n'
 
155
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
156
                first_line += '\n'
166
157
                req_args = _decode_tuple(first_line)
167
158
                self.request = request.SmartServerRequestHandler(
168
159
                    self._backing_transport, commands=request.request_handlers,
172
163
                if self.request.finished_reading:
173
164
                    # trivial request
174
165
                    self.unused_data = self.in_buffer
175
 
                    self.in_buffer = b''
 
166
                    self.in_buffer = ''
176
167
                    self._send_response(self.request.response)
177
168
            except KeyboardInterrupt:
178
169
                raise
179
 
            except errors.UnknownSmartMethod as err:
 
170
            except errors.UnknownSmartMethod, err:
180
171
                protocol_error = errors.SmartProtocolError(
181
 
                    "bad request '%s'" % (err.verb.decode('ascii'),))
 
172
                    "bad request %r" % (err.verb,))
182
173
                failure = request.FailedSmartServerResponse(
183
 
                    (b'error', str(protocol_error).encode('utf-8')))
 
174
                    ('error', str(protocol_error)))
184
175
                self._send_response(failure)
185
176
                return
186
 
            except Exception as exception:
 
177
            except Exception, exception:
187
178
                # everything else: pass to client, flush, and quit
188
179
                log_exception_quietly()
189
180
                self._send_response(request.FailedSmartServerResponse(
190
 
                    (b'error', str(exception).encode('utf-8'))))
 
181
                    ('error', str(exception))))
191
182
                return
192
183
 
193
184
        if self._has_dispatched:
195
186
                # nothing to do.XXX: this routine should be a single state
196
187
                # machine too.
197
188
                self.unused_data += self.in_buffer
198
 
                self.in_buffer = b''
 
189
                self.in_buffer = ''
199
190
                return
200
191
            if self._body_decoder is None:
201
192
                self._body_decoder = LengthPrefixedBodyDecoder()
210
201
            if self.request.response is not None:
211
202
                self._send_response(self.request.response)
212
203
                self.unused_data = self.in_buffer
213
 
                self.in_buffer = b''
 
204
                self.in_buffer = ''
214
205
            else:
215
206
                if self.request.finished_reading:
216
207
                    raise AssertionError(
227
218
        self._write_success_or_failure_prefix(response)
228
219
        self._write_func(_encode_tuple(args))
229
220
        if body is not None:
230
 
            if not isinstance(body, bytes):
 
221
            if not isinstance(body, str):
231
222
                raise ValueError(body)
232
 
            data = self._encode_bulk_data(body)
233
 
            self._write_func(data)
 
223
            bytes = self._encode_bulk_data(body)
 
224
            self._write_func(bytes)
234
225
 
235
226
    def _write_protocol_version(self):
236
227
        """Write any prefixes this protocol requires.
267
258
    def _write_success_or_failure_prefix(self, response):
268
259
        """Write the protocol specific success/failure prefix."""
269
260
        if response.is_successful():
270
 
            self._write_func(b'success\n')
 
261
            self._write_func('success\n')
271
262
        else:
272
 
            self._write_func(b'failed\n')
 
263
            self._write_func('failed\n')
273
264
 
274
265
    def _write_protocol_version(self):
275
266
        r"""Write any prefixes this protocol requires.
287
278
        self._write_success_or_failure_prefix(response)
288
279
        self._write_func(_encode_tuple(response.args))
289
280
        if response.body is not None:
290
 
            if not isinstance(response.body, bytes):
291
 
                raise AssertionError('body must be bytes')
 
281
            if not isinstance(response.body, str):
 
282
                raise AssertionError('body must be a str')
292
283
            if not (response.body_stream is None):
293
284
                raise AssertionError(
294
285
                    'body_stream and body cannot both be set')
295
 
            data = self._encode_bulk_data(response.body)
296
 
            self._write_func(data)
 
286
            bytes = self._encode_bulk_data(response.body)
 
287
            self._write_func(bytes)
297
288
        elif response.body_stream is not None:
298
289
            _send_stream(response.body_stream, self._write_func)
299
290
 
300
291
 
301
292
def _send_stream(stream, write_func):
302
 
    write_func(b'chunked\n')
 
293
    write_func('chunked\n')
303
294
    _send_chunks(stream, write_func)
304
 
    write_func(b'END\n')
 
295
    write_func('END\n')
305
296
 
306
297
 
307
298
def _send_chunks(stream, write_func):
308
299
    for chunk in stream:
309
 
        if isinstance(chunk, bytes):
310
 
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
311
 
            write_func(data)
 
300
        if isinstance(chunk, str):
 
301
            bytes = "%x\n%s" % (len(chunk), chunk)
 
302
            write_func(bytes)
312
303
        elif isinstance(chunk, request.FailedSmartServerResponse):
313
 
            write_func(b'ERR\n')
 
304
            write_func('ERR\n')
314
305
            _send_chunks(chunk.args, write_func)
315
306
            return
316
307
        else:
348
339
        self.finished_reading = False
349
340
        self._in_buffer_list = []
350
341
        self._in_buffer_len = 0
351
 
        self.unused_data = b''
 
342
        self.unused_data = ''
352
343
        self.bytes_left = None
353
344
        self._number_needed_bytes = None
354
345
 
355
346
    def _get_in_buffer(self):
356
347
        if len(self._in_buffer_list) == 1:
357
348
            return self._in_buffer_list[0]
358
 
        in_buffer = b''.join(self._in_buffer_list)
 
349
        in_buffer = ''.join(self._in_buffer_list)
359
350
        if len(in_buffer) != self._in_buffer_len:
360
351
            raise AssertionError(
361
352
                "Length of buffer did not match expected value: %s != %s"
374
365
        # check if we can yield the bytes from just the first entry in our list
375
366
        if len(self._in_buffer_list) == 0:
376
367
            raise AssertionError('Callers must be sure we have buffered bytes'
377
 
                                 ' before calling _get_in_bytes')
 
368
                ' before calling _get_in_bytes')
378
369
        if len(self._in_buffer_list[0]) > count:
379
370
            return self._in_buffer_list[0][:count]
380
371
        # We can't yield it from the first buffer, so collapse all buffers, and
384
375
 
385
376
    def _set_in_buffer(self, new_buf):
386
377
        if new_buf is not None:
387
 
            if not isinstance(new_buf, bytes):
388
 
                raise TypeError(new_buf)
389
378
            self._in_buffer_list = [new_buf]
390
379
            self._in_buffer_len = len(new_buf)
391
380
        else:
392
381
            self._in_buffer_list = []
393
382
            self._in_buffer_len = 0
394
383
 
395
 
    def accept_bytes(self, new_buf):
 
384
    def accept_bytes(self, bytes):
396
385
        """Decode as much of bytes as possible.
397
386
 
398
 
        If 'new_buf' contains too much data it will be appended to
 
387
        If 'bytes' contains too much data it will be appended to
399
388
        self.unused_data.
400
389
 
401
390
        finished_reading will be set when no more data is required.  Further
402
391
        data will be appended to self.unused_data.
403
392
        """
404
 
        if not isinstance(new_buf, bytes):
405
 
            raise TypeError(new_buf)
406
393
        # accept_bytes is allowed to change the state
407
394
        self._number_needed_bytes = None
408
395
        # lsprof puts a very large amount of time on this specific call for
409
396
        # large readv arrays
410
 
        self._in_buffer_list.append(new_buf)
411
 
        self._in_buffer_len += len(new_buf)
 
397
        self._in_buffer_list.append(bytes)
 
398
        self._in_buffer_len += len(bytes)
412
399
        try:
413
400
            # Run the function for the current state.
414
401
            current_state = self.state_accept
421
408
                #     _NeedMoreBytes).
422
409
                current_state = self.state_accept
423
410
                self.state_accept()
424
 
        except _NeedMoreBytes as e:
 
411
        except _NeedMoreBytes, e:
425
412
            self._number_needed_bytes = e.count
426
413
 
427
414
 
436
423
        _StatefulDecoder.__init__(self)
437
424
        self.state_accept = self._state_accept_expecting_header
438
425
        self.chunk_in_progress = None
439
 
        self.chunks = deque()
 
426
        self.chunks = collections.deque()
440
427
        self.error = False
441
428
        self.error_in_progress = None
442
429
 
471
458
 
472
459
    def _extract_line(self):
473
460
        in_buf = self._get_in_buffer()
474
 
        pos = in_buf.find(b'\n')
 
461
        pos = in_buf.find('\n')
475
462
        if pos == -1:
476
463
            # We haven't read a complete line yet, so request more bytes before
477
464
            # we continue.
478
465
            raise _NeedMoreBytes(1)
479
466
        line = in_buf[:pos]
480
467
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
481
 
        self._set_in_buffer(in_buf[pos + 1:])
 
468
        self._set_in_buffer(in_buf[pos+1:])
482
469
        return line
483
470
 
484
471
    def _finished(self):
494
481
 
495
482
    def _state_accept_expecting_header(self):
496
483
        prefix = self._extract_line()
497
 
        if prefix == b'chunked':
 
484
        if prefix == 'chunked':
498
485
            self.state_accept = self._state_accept_expecting_length
499
486
        else:
500
487
            raise errors.SmartProtocolError(
502
489
 
503
490
    def _state_accept_expecting_length(self):
504
491
        prefix = self._extract_line()
505
 
        if prefix == b'ERR':
 
492
        if prefix == 'ERR':
506
493
            self.error = True
507
494
            self.error_in_progress = []
508
495
            self._state_accept_expecting_length()
509
496
            return
510
 
        elif prefix == b'END':
 
497
        elif prefix == 'END':
511
498
            # We've read the end-of-body marker.
512
499
            # Any further bytes are unused data, including the bytes left in
513
500
            # the _in_buffer.
515
502
            return
516
503
        else:
517
504
            self.bytes_left = int(prefix, 16)
518
 
            self.chunk_in_progress = b''
 
505
            self.chunk_in_progress = ''
519
506
            self.state_accept = self._state_accept_reading_chunk
520
507
 
521
508
    def _state_accept_reading_chunk(self):
546
533
        _StatefulDecoder.__init__(self)
547
534
        self.state_accept = self._state_accept_expecting_length
548
535
        self.state_read = self._state_read_no_data
549
 
        self._body = b''
550
 
        self._trailer_buffer = b''
 
536
        self._body = ''
 
537
        self._trailer_buffer = ''
551
538
 
552
539
    def next_read_size(self):
553
540
        if self.bytes_left is not None:
571
558
 
572
559
    def _state_accept_expecting_length(self):
573
560
        in_buf = self._get_in_buffer()
574
 
        pos = in_buf.find(b'\n')
 
561
        pos = in_buf.find('\n')
575
562
        if pos == -1:
576
563
            return
577
564
        self.bytes_left = int(in_buf[:pos])
578
 
        self._set_in_buffer(in_buf[pos + 1:])
 
565
        self._set_in_buffer(in_buf[pos+1:])
579
566
        self.state_accept = self._state_accept_reading_body
580
567
        self.state_read = self._state_read_body_buffer
581
568
 
597
584
        self._set_in_buffer(None)
598
585
        # TODO: what if the trailer does not match "done\n"?  Should this raise
599
586
        # a ProtocolViolation exception?
600
 
        if self._trailer_buffer.startswith(b'done\n'):
601
 
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
 
587
        if self._trailer_buffer.startswith('done\n'):
 
588
            self.unused_data = self._trailer_buffer[len('done\n'):]
602
589
            self.state_accept = self._state_accept_reading_unused
603
590
            self.finished_reading = True
604
591
 
607
594
        self._set_in_buffer(None)
608
595
 
609
596
    def _state_read_no_data(self):
610
 
        return b''
 
597
        return ''
611
598
 
612
599
    def _state_read_body_buffer(self):
613
600
        result = self._body
614
 
        self._body = b''
 
601
        self._body = ''
615
602
        return result
616
603
 
617
604
 
639
626
            mutter('hpss call:   %s', repr(args)[1:-1])
640
627
            if getattr(self._request._medium, 'base', None) is not None:
641
628
                mutter('             (to %s)', self._request._medium.base)
642
 
            self._request_start_time = osutils.perf_counter()
 
629
            self._request_start_time = osutils.timer_func()
643
630
        self._write_args(args)
644
631
        self._request.finished_writing()
645
632
        self._last_verb = args[0]
652
639
        if 'hpss' in debug.debug_flags:
653
640
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
654
641
            if getattr(self._request._medium, '_path', None) is not None:
655
 
                mutter('                  (to %s)',
656
 
                       self._request._medium._path)
 
642
                mutter('                  (to %s)', self._request._medium._path)
657
643
            mutter('              %d bytes', len(body))
658
 
            self._request_start_time = osutils.perf_counter()
 
644
            self._request_start_time = osutils.timer_func()
659
645
            if 'hpssdetail' in debug.debug_flags:
660
646
                mutter('hpss body content: %s', body)
661
647
        self._write_args(args)
668
654
        """Make a remote call with a readv array.
669
655
 
670
656
        The body is encoded with one line per readv offset pair. The numbers in
671
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
657
        each pair are separated by a comma, and no trailing \n is emitted.
672
658
        """
673
659
        if 'hpss' in debug.debug_flags:
674
660
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
675
661
            if getattr(self._request._medium, '_path', None) is not None:
676
 
                mutter('                  (to %s)',
677
 
                       self._request._medium._path)
678
 
            self._request_start_time = osutils.perf_counter()
 
662
                mutter('                  (to %s)', self._request._medium._path)
 
663
            self._request_start_time = osutils.timer_func()
679
664
        self._write_args(args)
680
665
        readv_bytes = self._serialise_offsets(body)
681
666
        bytes = self._encode_bulk_data(readv_bytes)
707
692
        if 'hpss' in debug.debug_flags:
708
693
            if self._request_start_time is not None:
709
694
                mutter('   result:   %6.3fs  %s',
710
 
                       osutils.perf_counter() - self._request_start_time,
 
695
                       osutils.timer_func() - self._request_start_time,
711
696
                       repr(result)[1:-1])
712
697
                self._request_start_time = None
713
698
            else:
733
718
        # returned in response to existing version 1 smart requests.  Responses
734
719
        # starting with these codes are always "failed" responses.
735
720
        v1_error_codes = [
736
 
            b'norepository',
737
 
            b'NoSuchFile',
738
 
            b'FileExists',
739
 
            b'DirectoryNotEmpty',
740
 
            b'ShortReadvError',
741
 
            b'UnicodeEncodeError',
742
 
            b'UnicodeDecodeError',
743
 
            b'ReadOnlyError',
744
 
            b'nobranch',
745
 
            b'NoSuchRevision',
746
 
            b'nosuchrevision',
747
 
            b'LockContention',
748
 
            b'UnlockableTransport',
749
 
            b'LockFailed',
750
 
            b'TokenMismatch',
751
 
            b'ReadError',
752
 
            b'PermissionDenied',
 
721
            'norepository',
 
722
            'NoSuchFile',
 
723
            'FileExists',
 
724
            'DirectoryNotEmpty',
 
725
            'ShortReadvError',
 
726
            'UnicodeEncodeError',
 
727
            'UnicodeDecodeError',
 
728
            'ReadOnlyError',
 
729
            'nobranch',
 
730
            'NoSuchRevision',
 
731
            'nosuchrevision',
 
732
            'LockContention',
 
733
            'UnlockableTransport',
 
734
            'LockFailed',
 
735
            'TokenMismatch',
 
736
            'ReadError',
 
737
            'PermissionDenied',
753
738
            ]
754
739
        if result_tuple[0] in v1_error_codes:
755
740
            self._request.finished_reading()
764
749
        :param verb: The verb used in that call.
765
750
        :raises: UnexpectedSmartServerResponse
766
751
        """
767
 
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
768
 
                             b"bad request '" + self._last_verb + b"'")
769
 
            or result_tuple == (b'error', b"Generic bzr smart protocol error: "
770
 
                                b"bad request u'%s'" % self._last_verb)):
 
752
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
 
753
                "bad request '%s'" % self._last_verb) or
 
754
              result_tuple == ('error', "Generic bzr smart protocol error: "
 
755
                "bad request u'%s'" % self._last_verb)):
771
756
            # The response will have no body, so we've finished reading.
772
757
            self._request.finished_reading()
773
758
            raise errors.UnknownSmartMethod(self._last_verb)
784
769
 
785
770
        while not _body_decoder.finished_reading:
786
771
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
787
 
            if bytes == b'':
 
772
            if bytes == '':
788
773
                # end of file encountered reading from server
789
774
                raise errors.ConnectionReset(
790
775
                    "Connection lost while reading response body.")
791
776
            _body_decoder.accept_bytes(bytes)
792
777
        self._request.finished_reading()
793
 
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
 
778
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
794
779
        # XXX: TODO check the trailer result.
795
780
        if 'hpss' in debug.debug_flags:
796
781
            mutter('              %d body bytes read',
803
788
 
804
789
    def query_version(self):
805
790
        """Return protocol version number of the server."""
806
 
        self.call(b'hello')
 
791
        self.call('hello')
807
792
        resp = self.read_response_tuple()
808
 
        if resp == (b'ok', b'1'):
 
793
        if resp == ('ok', '1'):
809
794
            return 1
810
 
        elif resp == (b'ok', b'2'):
 
795
        elif resp == ('ok', '2'):
811
796
            return 2
812
797
        else:
813
798
            raise errors.SmartProtocolError("bad response %r" % (resp,))
845
830
        response_status = self._request.read_line()
846
831
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
847
832
        self._response_is_unknown_method(result)
848
 
        if response_status == b'success\n':
 
833
        if response_status == 'success\n':
849
834
            self.response_status = True
850
835
            if not expect_body:
851
836
                self._request.finished_reading()
852
837
            return result
853
 
        elif response_status == b'failed\n':
 
838
        elif response_status == 'failed\n':
854
839
            self.response_status = False
855
840
            self._request.finished_reading()
856
841
            raise errors.ErrorFromSmartServer(result)
873
858
        _body_decoder = ChunkedBodyDecoder()
874
859
        while not _body_decoder.finished_reading:
875
860
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
876
 
            if bytes == b'':
 
861
            if bytes == '':
877
862
                # end of file encountered reading from server
878
863
                raise errors.ConnectionReset(
879
864
                    "Connection lost while reading streamed body.")
880
865
            _body_decoder.accept_bytes(bytes)
881
866
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
882
 
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
 
867
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
883
868
                    mutter('              %d byte chunk read',
884
869
                           len(body_bytes))
885
870
                yield body_bytes
892
877
        backing_transport, commands=request.request_handlers,
893
878
        root_client_path=root_client_path, jail_root=jail_root)
894
879
    responder = ProtocolThreeResponder(write_func)
895
 
    message_handler = message.ConventionalRequestHandler(
896
 
        request_handler, responder)
 
880
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
897
881
    return ProtocolThreeDecoder(message_handler)
898
882
 
899
883
 
923
907
            _StatefulDecoder.accept_bytes(self, bytes)
924
908
        except KeyboardInterrupt:
925
909
            raise
926
 
        except errors.SmartMessageHandlerError as exception:
 
910
        except errors.SmartMessageHandlerError, exception:
927
911
            # We do *not* set self.decoding_failed here.  The message handler
928
912
            # has raised an error, but the decoder is still able to parse bytes
929
913
            # and determine when this message ends.
933
917
            # The state machine is ready to continue decoding, but the
934
918
            # exception has interrupted the loop that runs the state machine.
935
919
            # So we call accept_bytes again to restart it.
936
 
            self.accept_bytes(b'')
937
 
        except Exception as exception:
 
920
            self.accept_bytes('')
 
921
        except Exception, exception:
938
922
            # The decoder itself has raised an exception.  We cannot continue
939
923
            # decoding.
940
924
            self.decoding_failed = True
981
965
            # The buffer is empty
982
966
            raise _NeedMoreBytes(1)
983
967
        in_buf = self._get_in_buffer()
984
 
        one_byte = in_buf[0:1]
 
968
        one_byte = in_buf[0]
985
969
        self._set_in_buffer(in_buf[1:])
986
970
        return one_byte
987
971
 
1008
992
 
1009
993
    def _state_accept_expecting_headers(self):
1010
994
        decoded = self._extract_prefixed_bencoded_data()
1011
 
        if not isinstance(decoded, dict):
 
995
        if type(decoded) is not dict:
1012
996
            raise errors.SmartProtocolError(
1013
997
                'Header object %r is not a dict' % (decoded,))
1014
998
        self.state_accept = self._state_accept_expecting_message_part
1019
1003
 
1020
1004
    def _state_accept_expecting_message_part(self):
1021
1005
        message_part_kind = self._extract_single_byte()
1022
 
        if message_part_kind == b'o':
 
1006
        if message_part_kind == 'o':
1023
1007
            self.state_accept = self._state_accept_expecting_one_byte
1024
 
        elif message_part_kind == b's':
 
1008
        elif message_part_kind == 's':
1025
1009
            self.state_accept = self._state_accept_expecting_structure
1026
 
        elif message_part_kind == b'b':
 
1010
        elif message_part_kind == 'b':
1027
1011
            self.state_accept = self._state_accept_expecting_bytes
1028
 
        elif message_part_kind == b'e':
 
1012
        elif message_part_kind == 'e':
1029
1013
            self.done()
1030
1014
        else:
1031
1015
            raise errors.SmartProtocolError(
1089
1073
class _ProtocolThreeEncoder(object):
1090
1074
 
1091
1075
    response_marker = request_marker = MESSAGE_VERSION_THREE
1092
 
    BUFFER_SIZE = 1024 * 1024  # 1 MiB buffer before flushing
 
1076
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1093
1077
 
1094
1078
    def __init__(self, write_func):
1095
1079
        self._buf = []
1097
1081
        self._real_write_func = write_func
1098
1082
 
1099
1083
    def _write_func(self, bytes):
 
1084
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1085
        #       for total number of bytes to write, rather than buffer based on
 
1086
        #       the number of write() calls
1100
1087
        # TODO: Another possibility would be to turn this into an async model.
1101
1088
        #       Where we let another thread know that we have some bytes if
1102
1089
        #       they want it, but we don't actually block for it
1109
1096
 
1110
1097
    def flush(self):
1111
1098
        if self._buf:
1112
 
            self._real_write_func(b''.join(self._buf))
 
1099
            self._real_write_func(''.join(self._buf))
1113
1100
            del self._buf[:]
1114
1101
            self._buf_len = 0
1115
1102
 
1117
1104
        """Serialise a readv offset list."""
1118
1105
        txt = []
1119
1106
        for start, length in offsets:
1120
 
            txt.append(b'%d,%d' % (start, length))
1121
 
        return b'\n'.join(txt)
 
1107
            txt.append('%d,%d' % (start, length))
 
1108
        return '\n'.join(txt)
1122
1109
 
1123
1110
    def _write_protocol_version(self):
1124
1111
        self._write_func(MESSAGE_VERSION_THREE)
1132
1119
        self._write_prefixed_bencode(headers)
1133
1120
 
1134
1121
    def _write_structure(self, args):
1135
 
        self._write_func(b's')
 
1122
        self._write_func('s')
1136
1123
        utf8_args = []
1137
1124
        for arg in args:
1138
 
            if isinstance(arg, text_type):
 
1125
            if type(arg) is unicode:
1139
1126
                utf8_args.append(arg.encode('utf8'))
1140
1127
            else:
1141
1128
                utf8_args.append(arg)
1142
1129
        self._write_prefixed_bencode(utf8_args)
1143
1130
 
1144
1131
    def _write_end(self):
1145
 
        self._write_func(b'e')
 
1132
        self._write_func('e')
1146
1133
        self.flush()
1147
1134
 
1148
1135
    def _write_prefixed_body(self, bytes):
1149
 
        self._write_func(b'b')
 
1136
        self._write_func('b')
1150
1137
        self._write_func(struct.pack('!L', len(bytes)))
1151
1138
        self._write_func(bytes)
1152
1139
 
1153
1140
    def _write_chunked_body_start(self):
1154
 
        self._write_func(b'oC')
 
1141
        self._write_func('oC')
1155
1142
 
1156
1143
    def _write_error_status(self):
1157
 
        self._write_func(b'oE')
 
1144
        self._write_func('oE')
1158
1145
 
1159
1146
    def _write_success_status(self):
1160
 
        self._write_func(b'oS')
 
1147
        self._write_func('oS')
1161
1148
 
1162
1149
 
1163
1150
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1165
1152
    def __init__(self, write_func):
1166
1153
        _ProtocolThreeEncoder.__init__(self, write_func)
1167
1154
        self.response_sent = False
1168
 
        self._headers = {
1169
 
            b'Software version': breezy.__version__.encode('utf-8')}
 
1155
        self._headers = {'Software version': bzrlib.__version__}
1170
1156
        if 'hpss' in debug.debug_flags:
1171
 
            self._thread_id = _thread.get_ident()
 
1157
            self._thread_id = thread.get_ident()
1172
1158
            self._response_start_time = None
1173
1159
 
1174
1160
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1175
1161
        if self._response_start_time is None:
1176
 
            self._response_start_time = osutils.perf_counter()
 
1162
            self._response_start_time = osutils.timer_func()
1177
1163
        if include_time:
1178
 
            t = '%5.3fs ' % (osutils.perf_counter() - self._response_start_time)
 
1164
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
1179
1165
        else:
1180
1166
            t = ''
1181
1167
        if extra_bytes is None:
1194
1180
                % (exception,))
1195
1181
        if isinstance(exception, errors.UnknownSmartMethod):
1196
1182
            failure = request.FailedSmartServerResponse(
1197
 
                (b'UnknownMethod', exception.verb))
 
1183
                ('UnknownMethod', exception.verb))
1198
1184
            self.send_response(failure)
1199
1185
            return
1200
1186
        if 'hpss' in debug.debug_flags:
1203
1189
        self._write_protocol_version()
1204
1190
        self._write_headers(self._headers)
1205
1191
        self._write_error_status()
1206
 
        self._write_structure(
1207
 
            (b'error', str(exception).encode('utf-8', 'replace')))
 
1192
        self._write_structure(('error', str(exception)))
1208
1193
        self._write_end()
1209
1194
 
1210
1195
    def send_response(self, response):
1289
1274
    iterator = iter(iterable)
1290
1275
    while True:
1291
1276
        try:
1292
 
            yield None, next(iterator)
 
1277
            yield None, iterator.next()
1293
1278
        except StopIteration:
1294
1279
            return
1295
1280
        except (KeyboardInterrupt, SystemExit):
1307
1292
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1308
1293
        self._medium_request = medium_request
1309
1294
        self._headers = {}
1310
 
        self.body_stream_started = None
1311
1295
 
1312
1296
    def set_headers(self, headers):
1313
1297
        self._headers = headers.copy()
1318
1302
            base = getattr(self._medium_request._medium, 'base', None)
1319
1303
            if base is not None:
1320
1304
                mutter('             (to %s)', base)
1321
 
            self._request_start_time = osutils.perf_counter()
 
1305
            self._request_start_time = osutils.timer_func()
1322
1306
        self._write_protocol_version()
1323
1307
        self._write_headers(self._headers)
1324
1308
        self._write_structure(args)
1336
1320
            if path is not None:
1337
1321
                mutter('                  (to %s)', path)
1338
1322
            mutter('              %d bytes', len(body))
1339
 
            self._request_start_time = osutils.perf_counter()
 
1323
            self._request_start_time = osutils.timer_func()
1340
1324
        self._write_protocol_version()
1341
1325
        self._write_headers(self._headers)
1342
1326
        self._write_structure(args)
1348
1332
        """Make a remote call with a readv array.
1349
1333
 
1350
1334
        The body is encoded with one line per readv offset pair. The numbers in
1351
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
1335
        each pair are separated by a comma, and no trailing \n is emitted.
1352
1336
        """
1353
1337
        if 'hpss' in debug.debug_flags:
1354
1338
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1355
1339
            path = getattr(self._medium_request._medium, '_path', None)
1356
1340
            if path is not None:
1357
1341
                mutter('                  (to %s)', path)
1358
 
            self._request_start_time = osutils.perf_counter()
 
1342
            self._request_start_time = osutils.timer_func()
1359
1343
        self._write_protocol_version()
1360
1344
        self._write_headers(self._headers)
1361
1345
        self._write_structure(args)
1372
1356
            path = getattr(self._medium_request._medium, '_path', None)
1373
1357
            if path is not None:
1374
1358
                mutter('                  (to %s)', path)
1375
 
            self._request_start_time = osutils.perf_counter()
1376
 
        self.body_stream_started = False
 
1359
            self._request_start_time = osutils.timer_func()
1377
1360
        self._write_protocol_version()
1378
1361
        self._write_headers(self._headers)
1379
1362
        self._write_structure(args)
1381
1364
        #       have finished sending the stream.  We would notice at the end
1382
1365
        #       anyway, but if the medium can deliver it early then it's good
1383
1366
        #       to short-circuit the whole request...
1384
 
        # Provoke any ConnectionReset failures before we start the body stream.
1385
 
        self.flush()
1386
 
        self.body_stream_started = True
1387
1367
        for exc_info, part in _iter_with_errors(stream):
1388
1368
            if exc_info is not None:
1389
1369
                # Iterating the stream failed.  Cleanly abort the request.
1390
1370
                self._write_error_status()
1391
1371
                # Currently the client unconditionally sends ('error',) as the
1392
1372
                # error args.
1393
 
                self._write_structure((b'error',))
 
1373
                self._write_structure(('error',))
1394
1374
                self._write_end()
1395
1375
                self._medium_request.finished_writing()
1396
 
                try:
1397
 
                    reraise(*exc_info)
1398
 
                finally:
1399
 
                    del exc_info
 
1376
                raise exc_info[0], exc_info[1], exc_info[2]
1400
1377
            else:
1401
1378
                self._write_prefixed_body(part)
1402
1379
                self.flush()
1403
1380
        self._write_end()
1404
1381
        self._medium_request.finished_writing()
 
1382