/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Vincent Ladeuil
  • Date: 2009-09-02 08:26:27 UTC
  • mto: (4669.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4670.
  • Revision ID: v.ladeuil+lp@free.fr-20090902082627-cit8vm6tefu9hwk2
Cleanup emacs-bzr-send-XXXXXX.el leaks in /tmp during selftest.

* tests/test_mail_client.py:
(TestEmacsMail.test_commandline,
TestEmacsMail.test_commandline_is_8bit): Cleanup the generated tmp
file.

* mail_client.py:
(EmacsMail.__init__, EmacsMail._get_compose_commandline): Keep
track of the tmp file to ease cleanup during testing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
18
18
client and server.
19
19
"""
20
20
 
21
 
from __future__ import absolute_import
22
 
 
23
 
try:
24
 
    from collections.abc import deque
25
 
except ImportError:  # python < 3.7
26
 
    from collections import deque
27
 
 
 
21
import collections
 
22
from cStringIO import StringIO
28
23
import struct
29
24
import sys
30
 
try:
31
 
    import _thread
32
 
except ImportError:
33
 
    import thread as _thread
34
25
import time
35
26
 
36
 
import breezy
37
 
from ... import (
38
 
    debug,
39
 
    errors,
40
 
    osutils,
41
 
    )
42
 
from ...sixish import (
43
 
    BytesIO,
44
 
    reraise,
45
 
)
46
 
from . import message, request
47
 
from ...sixish import text_type
48
 
from ...trace import log_exception_quietly, mutter
49
 
from ...bencode import bdecode_as_tuple, bencode
 
27
import bzrlib
 
28
from bzrlib import debug
 
29
from bzrlib import errors
 
30
from bzrlib.smart import message, request
 
31
from bzrlib.trace import log_exception_quietly, mutter
 
32
from bzrlib.bencode import bdecode_as_tuple, bencode
50
33
 
51
34
 
52
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
53
36
# responses to identify the protocol version being used. (There are no version
54
37
# one strings because that version doesn't send any).
55
 
REQUEST_VERSION_TWO = b'bzr request 2\n'
56
 
RESPONSE_VERSION_TWO = b'bzr response 2\n'
 
38
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
39
RESPONSE_VERSION_TWO = 'bzr response 2\n'
57
40
 
58
 
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
 
41
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
59
42
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
60
43
 
61
44
 
65
48
 
66
49
 
67
50
def _decode_tuple(req_line):
68
 
    if req_line is None or req_line == b'':
 
51
    if req_line is None or req_line == '':
69
52
        return None
70
 
    if not req_line.endswith(b'\n'):
 
53
    if req_line[-1] != '\n':
71
54
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
72
 
    return tuple(req_line[:-1].split(b'\x01'))
 
55
    return tuple(req_line[:-1].split('\x01'))
73
56
 
74
57
 
75
58
def _encode_tuple(args):
76
59
    """Encode the tuple args to a bytestream."""
77
 
    for arg in args:
78
 
        if isinstance(arg, text_type):
79
 
            raise TypeError(args)
80
 
    return b'\x01'.join(args) + b'\n'
 
60
    return '\x01'.join(args) + '\n'
81
61
 
82
62
 
83
63
class Requester(object):
121
101
    # support multiple chunks?
122
102
    def _encode_bulk_data(self, body):
123
103
        """Encode body as a bulk data chunk."""
124
 
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
 
104
        return ''.join(('%d\n' % len(body), body, 'done\n'))
125
105
 
126
106
    def _serialise_offsets(self, offsets):
127
107
        """Serialise a readv offset list."""
128
108
        txt = []
129
109
        for start, length in offsets:
130
 
            txt.append(b'%d,%d' % (start, length))
131
 
        return b'\n'.join(txt)
 
110
            txt.append('%d,%d' % (start, length))
 
111
        return '\n'.join(txt)
132
112
 
133
113
 
134
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
135
115
    """Server-side encoding and decoding logic for smart version 1."""
136
116
 
137
 
    def __init__(self, backing_transport, write_func, root_client_path='/',
138
 
                 jail_root=None):
 
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
139
118
        self._backing_transport = backing_transport
140
119
        self._root_client_path = root_client_path
141
 
        self._jail_root = jail_root
142
 
        self.unused_data = b''
 
120
        self.unused_data = ''
143
121
        self._finished = False
144
 
        self.in_buffer = b''
 
122
        self.in_buffer = ''
145
123
        self._has_dispatched = False
146
124
        self.request = None
147
125
        self._body_decoder = None
148
126
        self._write_func = write_func
149
127
 
150
 
    def accept_bytes(self, data):
 
128
    def accept_bytes(self, bytes):
151
129
        """Take bytes, and advance the internal state machine appropriately.
152
130
 
153
 
        :param data: must be a byte string
 
131
        :param bytes: must be a byte string
154
132
        """
155
 
        if not isinstance(data, bytes):
156
 
            raise ValueError(data)
157
 
        self.in_buffer += data
 
133
        if not isinstance(bytes, str):
 
134
            raise ValueError(bytes)
 
135
        self.in_buffer += bytes
158
136
        if not self._has_dispatched:
159
 
            if b'\n' not in self.in_buffer:
 
137
            if '\n' not in self.in_buffer:
160
138
                # no command line yet
161
139
                return
162
140
            self._has_dispatched = True
163
141
            try:
164
 
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
165
 
                first_line += b'\n'
 
142
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
143
                first_line += '\n'
166
144
                req_args = _decode_tuple(first_line)
167
145
                self.request = request.SmartServerRequestHandler(
168
146
                    self._backing_transport, commands=request.request_handlers,
169
 
                    root_client_path=self._root_client_path,
170
 
                    jail_root=self._jail_root)
171
 
                self.request.args_received(req_args)
 
147
                    root_client_path=self._root_client_path)
 
148
                self.request.dispatch_command(req_args[0], req_args[1:])
172
149
                if self.request.finished_reading:
173
150
                    # trivial request
174
151
                    self.unused_data = self.in_buffer
175
 
                    self.in_buffer = b''
 
152
                    self.in_buffer = ''
176
153
                    self._send_response(self.request.response)
177
154
            except KeyboardInterrupt:
178
155
                raise
179
 
            except errors.UnknownSmartMethod as err:
 
156
            except errors.UnknownSmartMethod, err:
180
157
                protocol_error = errors.SmartProtocolError(
181
 
                    "bad request '%s'" % (err.verb.decode('ascii'),))
 
158
                    "bad request %r" % (err.verb,))
182
159
                failure = request.FailedSmartServerResponse(
183
 
                    (b'error', str(protocol_error).encode('utf-8')))
 
160
                    ('error', str(protocol_error)))
184
161
                self._send_response(failure)
185
162
                return
186
 
            except Exception as exception:
 
163
            except Exception, exception:
187
164
                # everything else: pass to client, flush, and quit
188
165
                log_exception_quietly()
189
166
                self._send_response(request.FailedSmartServerResponse(
190
 
                    (b'error', str(exception).encode('utf-8'))))
 
167
                    ('error', str(exception))))
191
168
                return
192
169
 
193
170
        if self._has_dispatched:
195
172
                # nothing to do.XXX: this routine should be a single state
196
173
                # machine too.
197
174
                self.unused_data += self.in_buffer
198
 
                self.in_buffer = b''
 
175
                self.in_buffer = ''
199
176
                return
200
177
            if self._body_decoder is None:
201
178
                self._body_decoder = LengthPrefixedBodyDecoder()
210
187
            if self.request.response is not None:
211
188
                self._send_response(self.request.response)
212
189
                self.unused_data = self.in_buffer
213
 
                self.in_buffer = b''
 
190
                self.in_buffer = ''
214
191
            else:
215
192
                if self.request.finished_reading:
216
193
                    raise AssertionError(
227
204
        self._write_success_or_failure_prefix(response)
228
205
        self._write_func(_encode_tuple(args))
229
206
        if body is not None:
230
 
            if not isinstance(body, bytes):
 
207
            if not isinstance(body, str):
231
208
                raise ValueError(body)
232
 
            data = self._encode_bulk_data(body)
233
 
            self._write_func(data)
 
209
            bytes = self._encode_bulk_data(body)
 
210
            self._write_func(bytes)
234
211
 
235
212
    def _write_protocol_version(self):
236
213
        """Write any prefixes this protocol requires.
267
244
    def _write_success_or_failure_prefix(self, response):
268
245
        """Write the protocol specific success/failure prefix."""
269
246
        if response.is_successful():
270
 
            self._write_func(b'success\n')
 
247
            self._write_func('success\n')
271
248
        else:
272
 
            self._write_func(b'failed\n')
 
249
            self._write_func('failed\n')
273
250
 
274
251
    def _write_protocol_version(self):
275
252
        r"""Write any prefixes this protocol requires.
287
264
        self._write_success_or_failure_prefix(response)
288
265
        self._write_func(_encode_tuple(response.args))
289
266
        if response.body is not None:
290
 
            if not isinstance(response.body, bytes):
291
 
                raise AssertionError('body must be bytes')
 
267
            if not isinstance(response.body, str):
 
268
                raise AssertionError('body must be a str')
292
269
            if not (response.body_stream is None):
293
270
                raise AssertionError(
294
271
                    'body_stream and body cannot both be set')
295
 
            data = self._encode_bulk_data(response.body)
296
 
            self._write_func(data)
 
272
            bytes = self._encode_bulk_data(response.body)
 
273
            self._write_func(bytes)
297
274
        elif response.body_stream is not None:
298
275
            _send_stream(response.body_stream, self._write_func)
299
276
 
300
277
 
301
278
def _send_stream(stream, write_func):
302
 
    write_func(b'chunked\n')
 
279
    write_func('chunked\n')
303
280
    _send_chunks(stream, write_func)
304
 
    write_func(b'END\n')
 
281
    write_func('END\n')
305
282
 
306
283
 
307
284
def _send_chunks(stream, write_func):
308
285
    for chunk in stream:
309
 
        if isinstance(chunk, bytes):
310
 
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
311
 
            write_func(data)
 
286
        if isinstance(chunk, str):
 
287
            bytes = "%x\n%s" % (len(chunk), chunk)
 
288
            write_func(bytes)
312
289
        elif isinstance(chunk, request.FailedSmartServerResponse):
313
 
            write_func(b'ERR\n')
 
290
            write_func('ERR\n')
314
291
            _send_chunks(chunk.args, write_func)
315
292
            return
316
293
        else:
348
325
        self.finished_reading = False
349
326
        self._in_buffer_list = []
350
327
        self._in_buffer_len = 0
351
 
        self.unused_data = b''
 
328
        self.unused_data = ''
352
329
        self.bytes_left = None
353
330
        self._number_needed_bytes = None
354
331
 
355
332
    def _get_in_buffer(self):
356
333
        if len(self._in_buffer_list) == 1:
357
334
            return self._in_buffer_list[0]
358
 
        in_buffer = b''.join(self._in_buffer_list)
 
335
        in_buffer = ''.join(self._in_buffer_list)
359
336
        if len(in_buffer) != self._in_buffer_len:
360
337
            raise AssertionError(
361
338
                "Length of buffer did not match expected value: %s != %s"
374
351
        # check if we can yield the bytes from just the first entry in our list
375
352
        if len(self._in_buffer_list) == 0:
376
353
            raise AssertionError('Callers must be sure we have buffered bytes'
377
 
                                 ' before calling _get_in_bytes')
 
354
                ' before calling _get_in_bytes')
378
355
        if len(self._in_buffer_list[0]) > count:
379
356
            return self._in_buffer_list[0][:count]
380
357
        # We can't yield it from the first buffer, so collapse all buffers, and
384
361
 
385
362
    def _set_in_buffer(self, new_buf):
386
363
        if new_buf is not None:
387
 
            if not isinstance(new_buf, bytes):
388
 
                raise TypeError(new_buf)
389
364
            self._in_buffer_list = [new_buf]
390
365
            self._in_buffer_len = len(new_buf)
391
366
        else:
392
367
            self._in_buffer_list = []
393
368
            self._in_buffer_len = 0
394
369
 
395
 
    def accept_bytes(self, new_buf):
 
370
    def accept_bytes(self, bytes):
396
371
        """Decode as much of bytes as possible.
397
372
 
398
 
        If 'new_buf' contains too much data it will be appended to
 
373
        If 'bytes' contains too much data it will be appended to
399
374
        self.unused_data.
400
375
 
401
376
        finished_reading will be set when no more data is required.  Further
402
377
        data will be appended to self.unused_data.
403
378
        """
404
 
        if not isinstance(new_buf, bytes):
405
 
            raise TypeError(new_buf)
406
379
        # accept_bytes is allowed to change the state
407
380
        self._number_needed_bytes = None
408
381
        # lsprof puts a very large amount of time on this specific call for
409
382
        # large readv arrays
410
 
        self._in_buffer_list.append(new_buf)
411
 
        self._in_buffer_len += len(new_buf)
 
383
        self._in_buffer_list.append(bytes)
 
384
        self._in_buffer_len += len(bytes)
412
385
        try:
413
386
            # Run the function for the current state.
414
387
            current_state = self.state_accept
421
394
                #     _NeedMoreBytes).
422
395
                current_state = self.state_accept
423
396
                self.state_accept()
424
 
        except _NeedMoreBytes as e:
 
397
        except _NeedMoreBytes, e:
425
398
            self._number_needed_bytes = e.count
426
399
 
427
400
 
436
409
        _StatefulDecoder.__init__(self)
437
410
        self.state_accept = self._state_accept_expecting_header
438
411
        self.chunk_in_progress = None
439
 
        self.chunks = deque()
 
412
        self.chunks = collections.deque()
440
413
        self.error = False
441
414
        self.error_in_progress = None
442
415
 
471
444
 
472
445
    def _extract_line(self):
473
446
        in_buf = self._get_in_buffer()
474
 
        pos = in_buf.find(b'\n')
 
447
        pos = in_buf.find('\n')
475
448
        if pos == -1:
476
449
            # We haven't read a complete line yet, so request more bytes before
477
450
            # we continue.
478
451
            raise _NeedMoreBytes(1)
479
452
        line = in_buf[:pos]
480
453
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
481
 
        self._set_in_buffer(in_buf[pos + 1:])
 
454
        self._set_in_buffer(in_buf[pos+1:])
482
455
        return line
483
456
 
484
457
    def _finished(self):
494
467
 
495
468
    def _state_accept_expecting_header(self):
496
469
        prefix = self._extract_line()
497
 
        if prefix == b'chunked':
 
470
        if prefix == 'chunked':
498
471
            self.state_accept = self._state_accept_expecting_length
499
472
        else:
500
473
            raise errors.SmartProtocolError(
502
475
 
503
476
    def _state_accept_expecting_length(self):
504
477
        prefix = self._extract_line()
505
 
        if prefix == b'ERR':
 
478
        if prefix == 'ERR':
506
479
            self.error = True
507
480
            self.error_in_progress = []
508
481
            self._state_accept_expecting_length()
509
482
            return
510
 
        elif prefix == b'END':
 
483
        elif prefix == 'END':
511
484
            # We've read the end-of-body marker.
512
485
            # Any further bytes are unused data, including the bytes left in
513
486
            # the _in_buffer.
515
488
            return
516
489
        else:
517
490
            self.bytes_left = int(prefix, 16)
518
 
            self.chunk_in_progress = b''
 
491
            self.chunk_in_progress = ''
519
492
            self.state_accept = self._state_accept_reading_chunk
520
493
 
521
494
    def _state_accept_reading_chunk(self):
546
519
        _StatefulDecoder.__init__(self)
547
520
        self.state_accept = self._state_accept_expecting_length
548
521
        self.state_read = self._state_read_no_data
549
 
        self._body = b''
550
 
        self._trailer_buffer = b''
 
522
        self._body = ''
 
523
        self._trailer_buffer = ''
551
524
 
552
525
    def next_read_size(self):
553
526
        if self.bytes_left is not None:
571
544
 
572
545
    def _state_accept_expecting_length(self):
573
546
        in_buf = self._get_in_buffer()
574
 
        pos = in_buf.find(b'\n')
 
547
        pos = in_buf.find('\n')
575
548
        if pos == -1:
576
549
            return
577
550
        self.bytes_left = int(in_buf[:pos])
578
 
        self._set_in_buffer(in_buf[pos + 1:])
 
551
        self._set_in_buffer(in_buf[pos+1:])
579
552
        self.state_accept = self._state_accept_reading_body
580
553
        self.state_read = self._state_read_body_buffer
581
554
 
597
570
        self._set_in_buffer(None)
598
571
        # TODO: what if the trailer does not match "done\n"?  Should this raise
599
572
        # a ProtocolViolation exception?
600
 
        if self._trailer_buffer.startswith(b'done\n'):
601
 
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
 
573
        if self._trailer_buffer.startswith('done\n'):
 
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
602
575
            self.state_accept = self._state_accept_reading_unused
603
576
            self.finished_reading = True
604
577
 
607
580
        self._set_in_buffer(None)
608
581
 
609
582
    def _state_read_no_data(self):
610
 
        return b''
 
583
        return ''
611
584
 
612
585
    def _state_read_body_buffer(self):
613
586
        result = self._body
614
 
        self._body = b''
 
587
        self._body = ''
615
588
        return result
616
589
 
617
590
 
639
612
            mutter('hpss call:   %s', repr(args)[1:-1])
640
613
            if getattr(self._request._medium, 'base', None) is not None:
641
614
                mutter('             (to %s)', self._request._medium.base)
642
 
            self._request_start_time = osutils.perf_counter()
 
615
            self._request_start_time = time.time()
643
616
        self._write_args(args)
644
617
        self._request.finished_writing()
645
618
        self._last_verb = args[0]
652
625
        if 'hpss' in debug.debug_flags:
653
626
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
654
627
            if getattr(self._request._medium, '_path', None) is not None:
655
 
                mutter('                  (to %s)',
656
 
                       self._request._medium._path)
 
628
                mutter('                  (to %s)', self._request._medium._path)
657
629
            mutter('              %d bytes', len(body))
658
 
            self._request_start_time = osutils.perf_counter()
 
630
            self._request_start_time = time.time()
659
631
            if 'hpssdetail' in debug.debug_flags:
660
632
                mutter('hpss body content: %s', body)
661
633
        self._write_args(args)
668
640
        """Make a remote call with a readv array.
669
641
 
670
642
        The body is encoded with one line per readv offset pair. The numbers in
671
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
643
        each pair are separated by a comma, and no trailing \n is emitted.
672
644
        """
673
645
        if 'hpss' in debug.debug_flags:
674
646
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
675
647
            if getattr(self._request._medium, '_path', None) is not None:
676
 
                mutter('                  (to %s)',
677
 
                       self._request._medium._path)
678
 
            self._request_start_time = osutils.perf_counter()
 
648
                mutter('                  (to %s)', self._request._medium._path)
 
649
            self._request_start_time = time.time()
679
650
        self._write_args(args)
680
651
        readv_bytes = self._serialise_offsets(body)
681
652
        bytes = self._encode_bulk_data(readv_bytes)
707
678
        if 'hpss' in debug.debug_flags:
708
679
            if self._request_start_time is not None:
709
680
                mutter('   result:   %6.3fs  %s',
710
 
                       osutils.perf_counter() - self._request_start_time,
 
681
                       time.time() - self._request_start_time,
711
682
                       repr(result)[1:-1])
712
683
                self._request_start_time = None
713
684
            else:
733
704
        # returned in response to existing version 1 smart requests.  Responses
734
705
        # starting with these codes are always "failed" responses.
735
706
        v1_error_codes = [
736
 
            b'norepository',
737
 
            b'NoSuchFile',
738
 
            b'FileExists',
739
 
            b'DirectoryNotEmpty',
740
 
            b'ShortReadvError',
741
 
            b'UnicodeEncodeError',
742
 
            b'UnicodeDecodeError',
743
 
            b'ReadOnlyError',
744
 
            b'nobranch',
745
 
            b'NoSuchRevision',
746
 
            b'nosuchrevision',
747
 
            b'LockContention',
748
 
            b'UnlockableTransport',
749
 
            b'LockFailed',
750
 
            b'TokenMismatch',
751
 
            b'ReadError',
752
 
            b'PermissionDenied',
 
707
            'norepository',
 
708
            'NoSuchFile',
 
709
            'FileExists',
 
710
            'DirectoryNotEmpty',
 
711
            'ShortReadvError',
 
712
            'UnicodeEncodeError',
 
713
            'UnicodeDecodeError',
 
714
            'ReadOnlyError',
 
715
            'nobranch',
 
716
            'NoSuchRevision',
 
717
            'nosuchrevision',
 
718
            'LockContention',
 
719
            'UnlockableTransport',
 
720
            'LockFailed',
 
721
            'TokenMismatch',
 
722
            'ReadError',
 
723
            'PermissionDenied',
753
724
            ]
754
725
        if result_tuple[0] in v1_error_codes:
755
726
            self._request.finished_reading()
764
735
        :param verb: The verb used in that call.
765
736
        :raises: UnexpectedSmartServerResponse
766
737
        """
767
 
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
768
 
                             b"bad request '" + self._last_verb + b"'")
769
 
            or result_tuple == (b'error', b"Generic bzr smart protocol error: "
770
 
                                b"bad request u'%s'" % self._last_verb)):
 
738
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
 
739
                "bad request '%s'" % self._last_verb) or
 
740
              result_tuple == ('error', "Generic bzr smart protocol error: "
 
741
                "bad request u'%s'" % self._last_verb)):
771
742
            # The response will have no body, so we've finished reading.
772
743
            self._request.finished_reading()
773
744
            raise errors.UnknownSmartMethod(self._last_verb)
784
755
 
785
756
        while not _body_decoder.finished_reading:
786
757
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
787
 
            if bytes == b'':
 
758
            if bytes == '':
788
759
                # end of file encountered reading from server
789
760
                raise errors.ConnectionReset(
790
761
                    "Connection lost while reading response body.")
791
762
            _body_decoder.accept_bytes(bytes)
792
763
        self._request.finished_reading()
793
 
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
 
764
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
794
765
        # XXX: TODO check the trailer result.
795
766
        if 'hpss' in debug.debug_flags:
796
767
            mutter('              %d body bytes read',
803
774
 
804
775
    def query_version(self):
805
776
        """Return protocol version number of the server."""
806
 
        self.call(b'hello')
 
777
        self.call('hello')
807
778
        resp = self.read_response_tuple()
808
 
        if resp == (b'ok', b'1'):
 
779
        if resp == ('ok', '1'):
809
780
            return 1
810
 
        elif resp == (b'ok', b'2'):
 
781
        elif resp == ('ok', '2'):
811
782
            return 2
812
783
        else:
813
784
            raise errors.SmartProtocolError("bad response %r" % (resp,))
845
816
        response_status = self._request.read_line()
846
817
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
847
818
        self._response_is_unknown_method(result)
848
 
        if response_status == b'success\n':
 
819
        if response_status == 'success\n':
849
820
            self.response_status = True
850
821
            if not expect_body:
851
822
                self._request.finished_reading()
852
823
            return result
853
 
        elif response_status == b'failed\n':
 
824
        elif response_status == 'failed\n':
854
825
            self.response_status = False
855
826
            self._request.finished_reading()
856
827
            raise errors.ErrorFromSmartServer(result)
873
844
        _body_decoder = ChunkedBodyDecoder()
874
845
        while not _body_decoder.finished_reading:
875
846
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
876
 
            if bytes == b'':
 
847
            if bytes == '':
877
848
                # end of file encountered reading from server
878
849
                raise errors.ConnectionReset(
879
850
                    "Connection lost while reading streamed body.")
880
851
            _body_decoder.accept_bytes(bytes)
881
852
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
882
 
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
 
853
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
883
854
                    mutter('              %d byte chunk read',
884
855
                           len(body_bytes))
885
856
                yield body_bytes
887
858
 
888
859
 
889
860
def build_server_protocol_three(backing_transport, write_func,
890
 
                                root_client_path, jail_root=None):
 
861
                                root_client_path):
891
862
    request_handler = request.SmartServerRequestHandler(
892
863
        backing_transport, commands=request.request_handlers,
893
 
        root_client_path=root_client_path, jail_root=jail_root)
 
864
        root_client_path=root_client_path)
894
865
    responder = ProtocolThreeResponder(write_func)
895
 
    message_handler = message.ConventionalRequestHandler(
896
 
        request_handler, responder)
 
866
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
897
867
    return ProtocolThreeDecoder(message_handler)
898
868
 
899
869
 
923
893
            _StatefulDecoder.accept_bytes(self, bytes)
924
894
        except KeyboardInterrupt:
925
895
            raise
926
 
        except errors.SmartMessageHandlerError as exception:
 
896
        except errors.SmartMessageHandlerError, exception:
927
897
            # We do *not* set self.decoding_failed here.  The message handler
928
898
            # has raised an error, but the decoder is still able to parse bytes
929
899
            # and determine when this message ends.
933
903
            # The state machine is ready to continue decoding, but the
934
904
            # exception has interrupted the loop that runs the state machine.
935
905
            # So we call accept_bytes again to restart it.
936
 
            self.accept_bytes(b'')
937
 
        except Exception as exception:
 
906
            self.accept_bytes('')
 
907
        except Exception, exception:
938
908
            # The decoder itself has raised an exception.  We cannot continue
939
909
            # decoding.
940
910
            self.decoding_failed = True
981
951
            # The buffer is empty
982
952
            raise _NeedMoreBytes(1)
983
953
        in_buf = self._get_in_buffer()
984
 
        one_byte = in_buf[0:1]
 
954
        one_byte = in_buf[0]
985
955
        self._set_in_buffer(in_buf[1:])
986
956
        return one_byte
987
957
 
1008
978
 
1009
979
    def _state_accept_expecting_headers(self):
1010
980
        decoded = self._extract_prefixed_bencoded_data()
1011
 
        if not isinstance(decoded, dict):
 
981
        if type(decoded) is not dict:
1012
982
            raise errors.SmartProtocolError(
1013
983
                'Header object %r is not a dict' % (decoded,))
1014
984
        self.state_accept = self._state_accept_expecting_message_part
1019
989
 
1020
990
    def _state_accept_expecting_message_part(self):
1021
991
        message_part_kind = self._extract_single_byte()
1022
 
        if message_part_kind == b'o':
 
992
        if message_part_kind == 'o':
1023
993
            self.state_accept = self._state_accept_expecting_one_byte
1024
 
        elif message_part_kind == b's':
 
994
        elif message_part_kind == 's':
1025
995
            self.state_accept = self._state_accept_expecting_structure
1026
 
        elif message_part_kind == b'b':
 
996
        elif message_part_kind == 'b':
1027
997
            self.state_accept = self._state_accept_expecting_bytes
1028
 
        elif message_part_kind == b'e':
 
998
        elif message_part_kind == 'e':
1029
999
            self.done()
1030
1000
        else:
1031
1001
            raise errors.SmartProtocolError(
1089
1059
class _ProtocolThreeEncoder(object):
1090
1060
 
1091
1061
    response_marker = request_marker = MESSAGE_VERSION_THREE
1092
 
    BUFFER_SIZE = 1024 * 1024  # 1 MiB buffer before flushing
1093
1062
 
1094
1063
    def __init__(self, write_func):
1095
1064
        self._buf = []
1096
 
        self._buf_len = 0
1097
1065
        self._real_write_func = write_func
1098
1066
 
1099
1067
    def _write_func(self, bytes):
1100
 
        # TODO: Another possibility would be to turn this into an async model.
1101
 
        #       Where we let another thread know that we have some bytes if
1102
 
        #       they want it, but we don't actually block for it
1103
 
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
1104
 
        #       we might just push out smaller bits at a time?
1105
1068
        self._buf.append(bytes)
1106
 
        self._buf_len += len(bytes)
1107
 
        if self._buf_len > self.BUFFER_SIZE:
 
1069
        if len(self._buf) > 100:
1108
1070
            self.flush()
1109
1071
 
1110
1072
    def flush(self):
1111
1073
        if self._buf:
1112
 
            self._real_write_func(b''.join(self._buf))
 
1074
            self._real_write_func(''.join(self._buf))
1113
1075
            del self._buf[:]
1114
 
            self._buf_len = 0
1115
1076
 
1116
1077
    def _serialise_offsets(self, offsets):
1117
1078
        """Serialise a readv offset list."""
1118
1079
        txt = []
1119
1080
        for start, length in offsets:
1120
 
            txt.append(b'%d,%d' % (start, length))
1121
 
        return b'\n'.join(txt)
 
1081
            txt.append('%d,%d' % (start, length))
 
1082
        return '\n'.join(txt)
1122
1083
 
1123
1084
    def _write_protocol_version(self):
1124
1085
        self._write_func(MESSAGE_VERSION_THREE)
1132
1093
        self._write_prefixed_bencode(headers)
1133
1094
 
1134
1095
    def _write_structure(self, args):
1135
 
        self._write_func(b's')
 
1096
        self._write_func('s')
1136
1097
        utf8_args = []
1137
1098
        for arg in args:
1138
 
            if isinstance(arg, text_type):
 
1099
            if type(arg) is unicode:
1139
1100
                utf8_args.append(arg.encode('utf8'))
1140
1101
            else:
1141
1102
                utf8_args.append(arg)
1142
1103
        self._write_prefixed_bencode(utf8_args)
1143
1104
 
1144
1105
    def _write_end(self):
1145
 
        self._write_func(b'e')
 
1106
        self._write_func('e')
1146
1107
        self.flush()
1147
1108
 
1148
1109
    def _write_prefixed_body(self, bytes):
1149
 
        self._write_func(b'b')
 
1110
        self._write_func('b')
1150
1111
        self._write_func(struct.pack('!L', len(bytes)))
1151
1112
        self._write_func(bytes)
1152
1113
 
1153
1114
    def _write_chunked_body_start(self):
1154
 
        self._write_func(b'oC')
 
1115
        self._write_func('oC')
1155
1116
 
1156
1117
    def _write_error_status(self):
1157
 
        self._write_func(b'oE')
 
1118
        self._write_func('oE')
1158
1119
 
1159
1120
    def _write_success_status(self):
1160
 
        self._write_func(b'oS')
 
1121
        self._write_func('oS')
1161
1122
 
1162
1123
 
1163
1124
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1165
1126
    def __init__(self, write_func):
1166
1127
        _ProtocolThreeEncoder.__init__(self, write_func)
1167
1128
        self.response_sent = False
1168
 
        self._headers = {
1169
 
            b'Software version': breezy.__version__.encode('utf-8')}
1170
 
        if 'hpss' in debug.debug_flags:
1171
 
            self._thread_id = _thread.get_ident()
1172
 
            self._response_start_time = None
1173
 
 
1174
 
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1175
 
        if self._response_start_time is None:
1176
 
            self._response_start_time = osutils.perf_counter()
1177
 
        if include_time:
1178
 
            t = '%5.3fs ' % (osutils.perf_counter() - self._response_start_time)
1179
 
        else:
1180
 
            t = ''
1181
 
        if extra_bytes is None:
1182
 
            extra = ''
1183
 
        else:
1184
 
            extra = ' ' + repr(extra_bytes[:40])
1185
 
            if len(extra) > 33:
1186
 
                extra = extra[:29] + extra[-1] + '...'
1187
 
        mutter('%12s: [%s] %s%s%s'
1188
 
               % (action, self._thread_id, t, message, extra))
 
1129
        self._headers = {'Software version': bzrlib.__version__}
1189
1130
 
1190
1131
    def send_error(self, exception):
1191
1132
        if self.response_sent:
1194
1135
                % (exception,))
1195
1136
        if isinstance(exception, errors.UnknownSmartMethod):
1196
1137
            failure = request.FailedSmartServerResponse(
1197
 
                (b'UnknownMethod', exception.verb))
 
1138
                ('UnknownMethod', exception.verb))
1198
1139
            self.send_response(failure)
1199
1140
            return
1200
 
        if 'hpss' in debug.debug_flags:
1201
 
            self._trace('error', str(exception))
1202
1141
        self.response_sent = True
1203
1142
        self._write_protocol_version()
1204
1143
        self._write_headers(self._headers)
1205
1144
        self._write_error_status()
1206
 
        self._write_structure(
1207
 
            (b'error', str(exception).encode('utf-8', 'replace')))
 
1145
        self._write_structure(('error', str(exception)))
1208
1146
        self._write_end()
1209
1147
 
1210
1148
    def send_response(self, response):
1219
1157
            self._write_success_status()
1220
1158
        else:
1221
1159
            self._write_error_status()
1222
 
        if 'hpss' in debug.debug_flags:
1223
 
            self._trace('response', repr(response.args))
1224
1160
        self._write_structure(response.args)
1225
1161
        if response.body is not None:
1226
1162
            self._write_prefixed_body(response.body)
1227
 
            if 'hpss' in debug.debug_flags:
1228
 
                self._trace('body', '%d bytes' % (len(response.body),),
1229
 
                            response.body, include_time=True)
1230
1163
        elif response.body_stream is not None:
1231
 
            count = num_bytes = 0
1232
 
            first_chunk = None
1233
1164
            for exc_info, chunk in _iter_with_errors(response.body_stream):
1234
 
                count += 1
1235
1165
                if exc_info is not None:
1236
1166
                    self._write_error_status()
1237
1167
                    error_struct = request._translate_error(exc_info[1])
1242
1172
                        self._write_error_status()
1243
1173
                        self._write_structure(chunk.args)
1244
1174
                        break
1245
 
                    num_bytes += len(chunk)
1246
 
                    if first_chunk is None:
1247
 
                        first_chunk = chunk
1248
1175
                    self._write_prefixed_body(chunk)
1249
 
                    self.flush()
1250
 
                    if 'hpssdetail' in debug.debug_flags:
1251
 
                        # Not worth timing separately, as _write_func is
1252
 
                        # actually buffered
1253
 
                        self._trace('body chunk',
1254
 
                                    '%d bytes' % (len(chunk),),
1255
 
                                    chunk, suppress_time=True)
1256
 
            if 'hpss' in debug.debug_flags:
1257
 
                self._trace('body stream',
1258
 
                            '%d bytes %d chunks' % (num_bytes, count),
1259
 
                            first_chunk)
1260
1176
        self._write_end()
1261
 
        if 'hpss' in debug.debug_flags:
1262
 
            self._trace('response end', '', include_time=True)
1263
1177
 
1264
1178
 
1265
1179
def _iter_with_errors(iterable):
1289
1203
    iterator = iter(iterable)
1290
1204
    while True:
1291
1205
        try:
1292
 
            yield None, next(iterator)
 
1206
            yield None, iterator.next()
1293
1207
        except StopIteration:
1294
1208
            return
1295
1209
        except (KeyboardInterrupt, SystemExit):
1307
1221
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1308
1222
        self._medium_request = medium_request
1309
1223
        self._headers = {}
1310
 
        self.body_stream_started = None
1311
1224
 
1312
1225
    def set_headers(self, headers):
1313
1226
        self._headers = headers.copy()
1318
1231
            base = getattr(self._medium_request._medium, 'base', None)
1319
1232
            if base is not None:
1320
1233
                mutter('             (to %s)', base)
1321
 
            self._request_start_time = osutils.perf_counter()
 
1234
            self._request_start_time = time.time()
1322
1235
        self._write_protocol_version()
1323
1236
        self._write_headers(self._headers)
1324
1237
        self._write_structure(args)
1336
1249
            if path is not None:
1337
1250
                mutter('                  (to %s)', path)
1338
1251
            mutter('              %d bytes', len(body))
1339
 
            self._request_start_time = osutils.perf_counter()
 
1252
            self._request_start_time = time.time()
1340
1253
        self._write_protocol_version()
1341
1254
        self._write_headers(self._headers)
1342
1255
        self._write_structure(args)
1348
1261
        """Make a remote call with a readv array.
1349
1262
 
1350
1263
        The body is encoded with one line per readv offset pair. The numbers in
1351
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
1264
        each pair are separated by a comma, and no trailing \n is emitted.
1352
1265
        """
1353
1266
        if 'hpss' in debug.debug_flags:
1354
1267
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1355
1268
            path = getattr(self._medium_request._medium, '_path', None)
1356
1269
            if path is not None:
1357
1270
                mutter('                  (to %s)', path)
1358
 
            self._request_start_time = osutils.perf_counter()
 
1271
            self._request_start_time = time.time()
1359
1272
        self._write_protocol_version()
1360
1273
        self._write_headers(self._headers)
1361
1274
        self._write_structure(args)
1372
1285
            path = getattr(self._medium_request._medium, '_path', None)
1373
1286
            if path is not None:
1374
1287
                mutter('                  (to %s)', path)
1375
 
            self._request_start_time = osutils.perf_counter()
1376
 
        self.body_stream_started = False
 
1288
            self._request_start_time = time.time()
1377
1289
        self._write_protocol_version()
1378
1290
        self._write_headers(self._headers)
1379
1291
        self._write_structure(args)
1381
1293
        #       have finished sending the stream.  We would notice at the end
1382
1294
        #       anyway, but if the medium can deliver it early then it's good
1383
1295
        #       to short-circuit the whole request...
1384
 
        # Provoke any ConnectionReset failures before we start the body stream.
1385
 
        self.flush()
1386
 
        self.body_stream_started = True
1387
1296
        for exc_info, part in _iter_with_errors(stream):
1388
1297
            if exc_info is not None:
1389
1298
                # Iterating the stream failed.  Cleanly abort the request.
1390
1299
                self._write_error_status()
1391
1300
                # Currently the client unconditionally sends ('error',) as the
1392
1301
                # error args.
1393
 
                self._write_structure((b'error',))
 
1302
                self._write_structure(('error',))
1394
1303
                self._write_end()
1395
1304
                self._medium_request.finished_writing()
1396
 
                try:
1397
 
                    reraise(*exc_info)
1398
 
                finally:
1399
 
                    del exc_info
 
1305
                raise exc_info[0], exc_info[1], exc_info[2]
1400
1306
            else:
1401
1307
                self._write_prefixed_body(part)
1402
1308
                self.flush()
1403
1309
        self._write_end()
1404
1310
        self._medium_request.finished_writing()
 
1311