/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Ferlito
  • Date: 2009-09-02 04:31:45 UTC
  • mto: (4665.7.1 serve-init)
  • mto: This revision was merged to the branch mainline in revision 4913.
  • Revision ID: johnf@inodes.org-20090902043145-gxdsfw03ilcwbyn5
Add a debian init script for bzr --serve

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
18
18
client and server.
19
19
"""
20
20
 
21
 
try:
22
 
    from collections.abc import deque
23
 
except ImportError:  # python < 3.7
24
 
    from collections import deque
25
 
 
26
 
from io import BytesIO
 
21
import collections
 
22
from cStringIO import StringIO
27
23
import struct
28
24
import sys
29
 
import _thread
30
25
import time
31
26
 
32
 
import breezy
33
 
from ... import (
34
 
    debug,
35
 
    errors,
36
 
    osutils,
37
 
    )
38
 
from . import message, request
39
 
from ...trace import log_exception_quietly, mutter
40
 
from ...bencode import bdecode_as_tuple, bencode
 
27
import bzrlib
 
28
from bzrlib import debug
 
29
from bzrlib import errors
 
30
from bzrlib.smart import message, request
 
31
from bzrlib.trace import log_exception_quietly, mutter
 
32
from bzrlib.bencode import bdecode_as_tuple, bencode
41
33
 
42
34
 
43
35
# Protocol version strings.  These are sent as prefixes of bzr requests and
44
36
# responses to identify the protocol version being used. (There are no version
45
37
# one strings because that version doesn't send any).
46
 
REQUEST_VERSION_TWO = b'bzr request 2\n'
47
 
RESPONSE_VERSION_TWO = b'bzr response 2\n'
 
38
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
39
RESPONSE_VERSION_TWO = 'bzr response 2\n'
48
40
 
49
 
MESSAGE_VERSION_THREE = b'bzr message 3 (bzr 1.6)\n'
 
41
MESSAGE_VERSION_THREE = 'bzr message 3 (bzr 1.6)\n'
50
42
RESPONSE_VERSION_THREE = REQUEST_VERSION_THREE = MESSAGE_VERSION_THREE
51
43
 
52
44
 
56
48
 
57
49
 
58
50
def _decode_tuple(req_line):
59
 
    if req_line is None or req_line == b'':
 
51
    if req_line is None or req_line == '':
60
52
        return None
61
 
    if not req_line.endswith(b'\n'):
 
53
    if req_line[-1] != '\n':
62
54
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
63
 
    return tuple(req_line[:-1].split(b'\x01'))
 
55
    return tuple(req_line[:-1].split('\x01'))
64
56
 
65
57
 
66
58
def _encode_tuple(args):
67
59
    """Encode the tuple args to a bytestream."""
68
 
    for arg in args:
69
 
        if isinstance(arg, str):
70
 
            raise TypeError(args)
71
 
    return b'\x01'.join(args) + b'\n'
 
60
    return '\x01'.join(args) + '\n'
72
61
 
73
62
 
74
63
class Requester(object):
112
101
    # support multiple chunks?
113
102
    def _encode_bulk_data(self, body):
114
103
        """Encode body as a bulk data chunk."""
115
 
        return b''.join((b'%d\n' % len(body), body, b'done\n'))
 
104
        return ''.join(('%d\n' % len(body), body, 'done\n'))
116
105
 
117
106
    def _serialise_offsets(self, offsets):
118
107
        """Serialise a readv offset list."""
119
108
        txt = []
120
109
        for start, length in offsets:
121
 
            txt.append(b'%d,%d' % (start, length))
122
 
        return b'\n'.join(txt)
 
110
            txt.append('%d,%d' % (start, length))
 
111
        return '\n'.join(txt)
123
112
 
124
113
 
125
114
class SmartServerRequestProtocolOne(SmartProtocolBase):
126
115
    """Server-side encoding and decoding logic for smart version 1."""
127
116
 
128
 
    def __init__(self, backing_transport, write_func, root_client_path='/',
129
 
                 jail_root=None):
 
117
    def __init__(self, backing_transport, write_func, root_client_path='/'):
130
118
        self._backing_transport = backing_transport
131
119
        self._root_client_path = root_client_path
132
 
        self._jail_root = jail_root
133
 
        self.unused_data = b''
 
120
        self.unused_data = ''
134
121
        self._finished = False
135
 
        self.in_buffer = b''
 
122
        self.in_buffer = ''
136
123
        self._has_dispatched = False
137
124
        self.request = None
138
125
        self._body_decoder = None
139
126
        self._write_func = write_func
140
127
 
141
 
    def accept_bytes(self, data):
 
128
    def accept_bytes(self, bytes):
142
129
        """Take bytes, and advance the internal state machine appropriately.
143
130
 
144
 
        :param data: must be a byte string
 
131
        :param bytes: must be a byte string
145
132
        """
146
 
        if not isinstance(data, bytes):
147
 
            raise ValueError(data)
148
 
        self.in_buffer += data
 
133
        if not isinstance(bytes, str):
 
134
            raise ValueError(bytes)
 
135
        self.in_buffer += bytes
149
136
        if not self._has_dispatched:
150
 
            if b'\n' not in self.in_buffer:
 
137
            if '\n' not in self.in_buffer:
151
138
                # no command line yet
152
139
                return
153
140
            self._has_dispatched = True
154
141
            try:
155
 
                first_line, self.in_buffer = self.in_buffer.split(b'\n', 1)
156
 
                first_line += b'\n'
 
142
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
143
                first_line += '\n'
157
144
                req_args = _decode_tuple(first_line)
158
145
                self.request = request.SmartServerRequestHandler(
159
146
                    self._backing_transport, commands=request.request_handlers,
160
 
                    root_client_path=self._root_client_path,
161
 
                    jail_root=self._jail_root)
162
 
                self.request.args_received(req_args)
 
147
                    root_client_path=self._root_client_path)
 
148
                self.request.dispatch_command(req_args[0], req_args[1:])
163
149
                if self.request.finished_reading:
164
150
                    # trivial request
165
151
                    self.unused_data = self.in_buffer
166
 
                    self.in_buffer = b''
 
152
                    self.in_buffer = ''
167
153
                    self._send_response(self.request.response)
168
154
            except KeyboardInterrupt:
169
155
                raise
170
 
            except errors.UnknownSmartMethod as err:
 
156
            except errors.UnknownSmartMethod, err:
171
157
                protocol_error = errors.SmartProtocolError(
172
 
                    "bad request '%s'" % (err.verb.decode('ascii'),))
 
158
                    "bad request %r" % (err.verb,))
173
159
                failure = request.FailedSmartServerResponse(
174
 
                    (b'error', str(protocol_error).encode('utf-8')))
 
160
                    ('error', str(protocol_error)))
175
161
                self._send_response(failure)
176
162
                return
177
 
            except Exception as exception:
 
163
            except Exception, exception:
178
164
                # everything else: pass to client, flush, and quit
179
165
                log_exception_quietly()
180
166
                self._send_response(request.FailedSmartServerResponse(
181
 
                    (b'error', str(exception).encode('utf-8'))))
 
167
                    ('error', str(exception))))
182
168
                return
183
169
 
184
170
        if self._has_dispatched:
186
172
                # nothing to do.XXX: this routine should be a single state
187
173
                # machine too.
188
174
                self.unused_data += self.in_buffer
189
 
                self.in_buffer = b''
 
175
                self.in_buffer = ''
190
176
                return
191
177
            if self._body_decoder is None:
192
178
                self._body_decoder = LengthPrefixedBodyDecoder()
201
187
            if self.request.response is not None:
202
188
                self._send_response(self.request.response)
203
189
                self.unused_data = self.in_buffer
204
 
                self.in_buffer = b''
 
190
                self.in_buffer = ''
205
191
            else:
206
192
                if self.request.finished_reading:
207
193
                    raise AssertionError(
218
204
        self._write_success_or_failure_prefix(response)
219
205
        self._write_func(_encode_tuple(args))
220
206
        if body is not None:
221
 
            if not isinstance(body, bytes):
 
207
            if not isinstance(body, str):
222
208
                raise ValueError(body)
223
 
            data = self._encode_bulk_data(body)
224
 
            self._write_func(data)
 
209
            bytes = self._encode_bulk_data(body)
 
210
            self._write_func(bytes)
225
211
 
226
212
    def _write_protocol_version(self):
227
213
        """Write any prefixes this protocol requires.
258
244
    def _write_success_or_failure_prefix(self, response):
259
245
        """Write the protocol specific success/failure prefix."""
260
246
        if response.is_successful():
261
 
            self._write_func(b'success\n')
 
247
            self._write_func('success\n')
262
248
        else:
263
 
            self._write_func(b'failed\n')
 
249
            self._write_func('failed\n')
264
250
 
265
251
    def _write_protocol_version(self):
266
252
        r"""Write any prefixes this protocol requires.
278
264
        self._write_success_or_failure_prefix(response)
279
265
        self._write_func(_encode_tuple(response.args))
280
266
        if response.body is not None:
281
 
            if not isinstance(response.body, bytes):
282
 
                raise AssertionError('body must be bytes')
 
267
            if not isinstance(response.body, str):
 
268
                raise AssertionError('body must be a str')
283
269
            if not (response.body_stream is None):
284
270
                raise AssertionError(
285
271
                    'body_stream and body cannot both be set')
286
 
            data = self._encode_bulk_data(response.body)
287
 
            self._write_func(data)
 
272
            bytes = self._encode_bulk_data(response.body)
 
273
            self._write_func(bytes)
288
274
        elif response.body_stream is not None:
289
275
            _send_stream(response.body_stream, self._write_func)
290
276
 
291
277
 
292
278
def _send_stream(stream, write_func):
293
 
    write_func(b'chunked\n')
 
279
    write_func('chunked\n')
294
280
    _send_chunks(stream, write_func)
295
 
    write_func(b'END\n')
 
281
    write_func('END\n')
296
282
 
297
283
 
298
284
def _send_chunks(stream, write_func):
299
285
    for chunk in stream:
300
 
        if isinstance(chunk, bytes):
301
 
            data = ("%x\n" % len(chunk)).encode('ascii') + chunk
302
 
            write_func(data)
 
286
        if isinstance(chunk, str):
 
287
            bytes = "%x\n%s" % (len(chunk), chunk)
 
288
            write_func(bytes)
303
289
        elif isinstance(chunk, request.FailedSmartServerResponse):
304
 
            write_func(b'ERR\n')
 
290
            write_func('ERR\n')
305
291
            _send_chunks(chunk.args, write_func)
306
292
            return
307
293
        else:
339
325
        self.finished_reading = False
340
326
        self._in_buffer_list = []
341
327
        self._in_buffer_len = 0
342
 
        self.unused_data = b''
 
328
        self.unused_data = ''
343
329
        self.bytes_left = None
344
330
        self._number_needed_bytes = None
345
331
 
346
332
    def _get_in_buffer(self):
347
333
        if len(self._in_buffer_list) == 1:
348
334
            return self._in_buffer_list[0]
349
 
        in_buffer = b''.join(self._in_buffer_list)
 
335
        in_buffer = ''.join(self._in_buffer_list)
350
336
        if len(in_buffer) != self._in_buffer_len:
351
337
            raise AssertionError(
352
338
                "Length of buffer did not match expected value: %s != %s"
365
351
        # check if we can yield the bytes from just the first entry in our list
366
352
        if len(self._in_buffer_list) == 0:
367
353
            raise AssertionError('Callers must be sure we have buffered bytes'
368
 
                                 ' before calling _get_in_bytes')
 
354
                ' before calling _get_in_bytes')
369
355
        if len(self._in_buffer_list[0]) > count:
370
356
            return self._in_buffer_list[0][:count]
371
357
        # We can't yield it from the first buffer, so collapse all buffers, and
375
361
 
376
362
    def _set_in_buffer(self, new_buf):
377
363
        if new_buf is not None:
378
 
            if not isinstance(new_buf, bytes):
379
 
                raise TypeError(new_buf)
380
364
            self._in_buffer_list = [new_buf]
381
365
            self._in_buffer_len = len(new_buf)
382
366
        else:
383
367
            self._in_buffer_list = []
384
368
            self._in_buffer_len = 0
385
369
 
386
 
    def accept_bytes(self, new_buf):
 
370
    def accept_bytes(self, bytes):
387
371
        """Decode as much of bytes as possible.
388
372
 
389
 
        If 'new_buf' contains too much data it will be appended to
 
373
        If 'bytes' contains too much data it will be appended to
390
374
        self.unused_data.
391
375
 
392
376
        finished_reading will be set when no more data is required.  Further
393
377
        data will be appended to self.unused_data.
394
378
        """
395
 
        if not isinstance(new_buf, bytes):
396
 
            raise TypeError(new_buf)
397
379
        # accept_bytes is allowed to change the state
398
380
        self._number_needed_bytes = None
399
381
        # lsprof puts a very large amount of time on this specific call for
400
382
        # large readv arrays
401
 
        self._in_buffer_list.append(new_buf)
402
 
        self._in_buffer_len += len(new_buf)
 
383
        self._in_buffer_list.append(bytes)
 
384
        self._in_buffer_len += len(bytes)
403
385
        try:
404
386
            # Run the function for the current state.
405
387
            current_state = self.state_accept
412
394
                #     _NeedMoreBytes).
413
395
                current_state = self.state_accept
414
396
                self.state_accept()
415
 
        except _NeedMoreBytes as e:
 
397
        except _NeedMoreBytes, e:
416
398
            self._number_needed_bytes = e.count
417
399
 
418
400
 
427
409
        _StatefulDecoder.__init__(self)
428
410
        self.state_accept = self._state_accept_expecting_header
429
411
        self.chunk_in_progress = None
430
 
        self.chunks = deque()
 
412
        self.chunks = collections.deque()
431
413
        self.error = False
432
414
        self.error_in_progress = None
433
415
 
462
444
 
463
445
    def _extract_line(self):
464
446
        in_buf = self._get_in_buffer()
465
 
        pos = in_buf.find(b'\n')
 
447
        pos = in_buf.find('\n')
466
448
        if pos == -1:
467
449
            # We haven't read a complete line yet, so request more bytes before
468
450
            # we continue.
469
451
            raise _NeedMoreBytes(1)
470
452
        line = in_buf[:pos]
471
453
        # Trim the prefix (including '\n' delimiter) from the _in_buffer.
472
 
        self._set_in_buffer(in_buf[pos + 1:])
 
454
        self._set_in_buffer(in_buf[pos+1:])
473
455
        return line
474
456
 
475
457
    def _finished(self):
485
467
 
486
468
    def _state_accept_expecting_header(self):
487
469
        prefix = self._extract_line()
488
 
        if prefix == b'chunked':
 
470
        if prefix == 'chunked':
489
471
            self.state_accept = self._state_accept_expecting_length
490
472
        else:
491
473
            raise errors.SmartProtocolError(
493
475
 
494
476
    def _state_accept_expecting_length(self):
495
477
        prefix = self._extract_line()
496
 
        if prefix == b'ERR':
 
478
        if prefix == 'ERR':
497
479
            self.error = True
498
480
            self.error_in_progress = []
499
481
            self._state_accept_expecting_length()
500
482
            return
501
 
        elif prefix == b'END':
 
483
        elif prefix == 'END':
502
484
            # We've read the end-of-body marker.
503
485
            # Any further bytes are unused data, including the bytes left in
504
486
            # the _in_buffer.
506
488
            return
507
489
        else:
508
490
            self.bytes_left = int(prefix, 16)
509
 
            self.chunk_in_progress = b''
 
491
            self.chunk_in_progress = ''
510
492
            self.state_accept = self._state_accept_reading_chunk
511
493
 
512
494
    def _state_accept_reading_chunk(self):
537
519
        _StatefulDecoder.__init__(self)
538
520
        self.state_accept = self._state_accept_expecting_length
539
521
        self.state_read = self._state_read_no_data
540
 
        self._body = b''
541
 
        self._trailer_buffer = b''
 
522
        self._body = ''
 
523
        self._trailer_buffer = ''
542
524
 
543
525
    def next_read_size(self):
544
526
        if self.bytes_left is not None:
562
544
 
563
545
    def _state_accept_expecting_length(self):
564
546
        in_buf = self._get_in_buffer()
565
 
        pos = in_buf.find(b'\n')
 
547
        pos = in_buf.find('\n')
566
548
        if pos == -1:
567
549
            return
568
550
        self.bytes_left = int(in_buf[:pos])
569
 
        self._set_in_buffer(in_buf[pos + 1:])
 
551
        self._set_in_buffer(in_buf[pos+1:])
570
552
        self.state_accept = self._state_accept_reading_body
571
553
        self.state_read = self._state_read_body_buffer
572
554
 
588
570
        self._set_in_buffer(None)
589
571
        # TODO: what if the trailer does not match "done\n"?  Should this raise
590
572
        # a ProtocolViolation exception?
591
 
        if self._trailer_buffer.startswith(b'done\n'):
592
 
            self.unused_data = self._trailer_buffer[len(b'done\n'):]
 
573
        if self._trailer_buffer.startswith('done\n'):
 
574
            self.unused_data = self._trailer_buffer[len('done\n'):]
593
575
            self.state_accept = self._state_accept_reading_unused
594
576
            self.finished_reading = True
595
577
 
598
580
        self._set_in_buffer(None)
599
581
 
600
582
    def _state_read_no_data(self):
601
 
        return b''
 
583
        return ''
602
584
 
603
585
    def _state_read_body_buffer(self):
604
586
        result = self._body
605
 
        self._body = b''
 
587
        self._body = ''
606
588
        return result
607
589
 
608
590
 
630
612
            mutter('hpss call:   %s', repr(args)[1:-1])
631
613
            if getattr(self._request._medium, 'base', None) is not None:
632
614
                mutter('             (to %s)', self._request._medium.base)
633
 
            self._request_start_time = osutils.perf_counter()
 
615
            self._request_start_time = time.time()
634
616
        self._write_args(args)
635
617
        self._request.finished_writing()
636
618
        self._last_verb = args[0]
643
625
        if 'hpss' in debug.debug_flags:
644
626
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
645
627
            if getattr(self._request._medium, '_path', None) is not None:
646
 
                mutter('                  (to %s)',
647
 
                       self._request._medium._path)
 
628
                mutter('                  (to %s)', self._request._medium._path)
648
629
            mutter('              %d bytes', len(body))
649
 
            self._request_start_time = osutils.perf_counter()
 
630
            self._request_start_time = time.time()
650
631
            if 'hpssdetail' in debug.debug_flags:
651
632
                mutter('hpss body content: %s', body)
652
633
        self._write_args(args)
659
640
        """Make a remote call with a readv array.
660
641
 
661
642
        The body is encoded with one line per readv offset pair. The numbers in
662
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
643
        each pair are separated by a comma, and no trailing \n is emitted.
663
644
        """
664
645
        if 'hpss' in debug.debug_flags:
665
646
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
666
647
            if getattr(self._request._medium, '_path', None) is not None:
667
 
                mutter('                  (to %s)',
668
 
                       self._request._medium._path)
669
 
            self._request_start_time = osutils.perf_counter()
 
648
                mutter('                  (to %s)', self._request._medium._path)
 
649
            self._request_start_time = time.time()
670
650
        self._write_args(args)
671
651
        readv_bytes = self._serialise_offsets(body)
672
652
        bytes = self._encode_bulk_data(readv_bytes)
698
678
        if 'hpss' in debug.debug_flags:
699
679
            if self._request_start_time is not None:
700
680
                mutter('   result:   %6.3fs  %s',
701
 
                       osutils.perf_counter() - self._request_start_time,
 
681
                       time.time() - self._request_start_time,
702
682
                       repr(result)[1:-1])
703
683
                self._request_start_time = None
704
684
            else:
724
704
        # returned in response to existing version 1 smart requests.  Responses
725
705
        # starting with these codes are always "failed" responses.
726
706
        v1_error_codes = [
727
 
            b'norepository',
728
 
            b'NoSuchFile',
729
 
            b'FileExists',
730
 
            b'DirectoryNotEmpty',
731
 
            b'ShortReadvError',
732
 
            b'UnicodeEncodeError',
733
 
            b'UnicodeDecodeError',
734
 
            b'ReadOnlyError',
735
 
            b'nobranch',
736
 
            b'NoSuchRevision',
737
 
            b'nosuchrevision',
738
 
            b'LockContention',
739
 
            b'UnlockableTransport',
740
 
            b'LockFailed',
741
 
            b'TokenMismatch',
742
 
            b'ReadError',
743
 
            b'PermissionDenied',
 
707
            'norepository',
 
708
            'NoSuchFile',
 
709
            'FileExists',
 
710
            'DirectoryNotEmpty',
 
711
            'ShortReadvError',
 
712
            'UnicodeEncodeError',
 
713
            'UnicodeDecodeError',
 
714
            'ReadOnlyError',
 
715
            'nobranch',
 
716
            'NoSuchRevision',
 
717
            'nosuchrevision',
 
718
            'LockContention',
 
719
            'UnlockableTransport',
 
720
            'LockFailed',
 
721
            'TokenMismatch',
 
722
            'ReadError',
 
723
            'PermissionDenied',
744
724
            ]
745
725
        if result_tuple[0] in v1_error_codes:
746
726
            self._request.finished_reading()
755
735
        :param verb: The verb used in that call.
756
736
        :raises: UnexpectedSmartServerResponse
757
737
        """
758
 
        if (result_tuple == (b'error', b"Generic bzr smart protocol error: "
759
 
                             b"bad request '" + self._last_verb + b"'")
760
 
            or result_tuple == (b'error', b"Generic bzr smart protocol error: "
761
 
                                b"bad request u'%s'" % self._last_verb)):
 
738
        if (result_tuple == ('error', "Generic bzr smart protocol error: "
 
739
                "bad request '%s'" % self._last_verb) or
 
740
              result_tuple == ('error', "Generic bzr smart protocol error: "
 
741
                "bad request u'%s'" % self._last_verb)):
762
742
            # The response will have no body, so we've finished reading.
763
743
            self._request.finished_reading()
764
744
            raise errors.UnknownSmartMethod(self._last_verb)
775
755
 
776
756
        while not _body_decoder.finished_reading:
777
757
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
778
 
            if bytes == b'':
 
758
            if bytes == '':
779
759
                # end of file encountered reading from server
780
760
                raise errors.ConnectionReset(
781
761
                    "Connection lost while reading response body.")
782
762
            _body_decoder.accept_bytes(bytes)
783
763
        self._request.finished_reading()
784
 
        self._body_buffer = BytesIO(_body_decoder.read_pending_data())
 
764
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
785
765
        # XXX: TODO check the trailer result.
786
766
        if 'hpss' in debug.debug_flags:
787
767
            mutter('              %d body bytes read',
794
774
 
795
775
    def query_version(self):
796
776
        """Return protocol version number of the server."""
797
 
        self.call(b'hello')
 
777
        self.call('hello')
798
778
        resp = self.read_response_tuple()
799
 
        if resp == (b'ok', b'1'):
 
779
        if resp == ('ok', '1'):
800
780
            return 1
801
 
        elif resp == (b'ok', b'2'):
 
781
        elif resp == ('ok', '2'):
802
782
            return 2
803
783
        else:
804
784
            raise errors.SmartProtocolError("bad response %r" % (resp,))
836
816
        response_status = self._request.read_line()
837
817
        result = SmartClientRequestProtocolOne._read_response_tuple(self)
838
818
        self._response_is_unknown_method(result)
839
 
        if response_status == b'success\n':
 
819
        if response_status == 'success\n':
840
820
            self.response_status = True
841
821
            if not expect_body:
842
822
                self._request.finished_reading()
843
823
            return result
844
 
        elif response_status == b'failed\n':
 
824
        elif response_status == 'failed\n':
845
825
            self.response_status = False
846
826
            self._request.finished_reading()
847
827
            raise errors.ErrorFromSmartServer(result)
864
844
        _body_decoder = ChunkedBodyDecoder()
865
845
        while not _body_decoder.finished_reading:
866
846
            bytes = self._request.read_bytes(_body_decoder.next_read_size())
867
 
            if bytes == b'':
 
847
            if bytes == '':
868
848
                # end of file encountered reading from server
869
849
                raise errors.ConnectionReset(
870
850
                    "Connection lost while reading streamed body.")
871
851
            _body_decoder.accept_bytes(bytes)
872
852
            for body_bytes in iter(_body_decoder.read_next_chunk, None):
873
 
                if 'hpss' in debug.debug_flags and isinstance(body_bytes, str):
 
853
                if 'hpss' in debug.debug_flags and type(body_bytes) is str:
874
854
                    mutter('              %d byte chunk read',
875
855
                           len(body_bytes))
876
856
                yield body_bytes
878
858
 
879
859
 
880
860
def build_server_protocol_three(backing_transport, write_func,
881
 
                                root_client_path, jail_root=None):
 
861
                                root_client_path):
882
862
    request_handler = request.SmartServerRequestHandler(
883
863
        backing_transport, commands=request.request_handlers,
884
 
        root_client_path=root_client_path, jail_root=jail_root)
 
864
        root_client_path=root_client_path)
885
865
    responder = ProtocolThreeResponder(write_func)
886
 
    message_handler = message.ConventionalRequestHandler(
887
 
        request_handler, responder)
 
866
    message_handler = message.ConventionalRequestHandler(request_handler, responder)
888
867
    return ProtocolThreeDecoder(message_handler)
889
868
 
890
869
 
914
893
            _StatefulDecoder.accept_bytes(self, bytes)
915
894
        except KeyboardInterrupt:
916
895
            raise
917
 
        except errors.SmartMessageHandlerError as exception:
 
896
        except errors.SmartMessageHandlerError, exception:
918
897
            # We do *not* set self.decoding_failed here.  The message handler
919
898
            # has raised an error, but the decoder is still able to parse bytes
920
899
            # and determine when this message ends.
924
903
            # The state machine is ready to continue decoding, but the
925
904
            # exception has interrupted the loop that runs the state machine.
926
905
            # So we call accept_bytes again to restart it.
927
 
            self.accept_bytes(b'')
928
 
        except Exception as exception:
 
906
            self.accept_bytes('')
 
907
        except Exception, exception:
929
908
            # The decoder itself has raised an exception.  We cannot continue
930
909
            # decoding.
931
910
            self.decoding_failed = True
972
951
            # The buffer is empty
973
952
            raise _NeedMoreBytes(1)
974
953
        in_buf = self._get_in_buffer()
975
 
        one_byte = in_buf[0:1]
 
954
        one_byte = in_buf[0]
976
955
        self._set_in_buffer(in_buf[1:])
977
956
        return one_byte
978
957
 
999
978
 
1000
979
    def _state_accept_expecting_headers(self):
1001
980
        decoded = self._extract_prefixed_bencoded_data()
1002
 
        if not isinstance(decoded, dict):
 
981
        if type(decoded) is not dict:
1003
982
            raise errors.SmartProtocolError(
1004
983
                'Header object %r is not a dict' % (decoded,))
1005
984
        self.state_accept = self._state_accept_expecting_message_part
1010
989
 
1011
990
    def _state_accept_expecting_message_part(self):
1012
991
        message_part_kind = self._extract_single_byte()
1013
 
        if message_part_kind == b'o':
 
992
        if message_part_kind == 'o':
1014
993
            self.state_accept = self._state_accept_expecting_one_byte
1015
 
        elif message_part_kind == b's':
 
994
        elif message_part_kind == 's':
1016
995
            self.state_accept = self._state_accept_expecting_structure
1017
 
        elif message_part_kind == b'b':
 
996
        elif message_part_kind == 'b':
1018
997
            self.state_accept = self._state_accept_expecting_bytes
1019
 
        elif message_part_kind == b'e':
 
998
        elif message_part_kind == 'e':
1020
999
            self.done()
1021
1000
        else:
1022
1001
            raise errors.SmartProtocolError(
1080
1059
class _ProtocolThreeEncoder(object):
1081
1060
 
1082
1061
    response_marker = request_marker = MESSAGE_VERSION_THREE
1083
 
    BUFFER_SIZE = 1024 * 1024  # 1 MiB buffer before flushing
1084
1062
 
1085
1063
    def __init__(self, write_func):
1086
1064
        self._buf = []
1087
 
        self._buf_len = 0
1088
1065
        self._real_write_func = write_func
1089
1066
 
1090
1067
    def _write_func(self, bytes):
1091
 
        # TODO: Another possibility would be to turn this into an async model.
1092
 
        #       Where we let another thread know that we have some bytes if
1093
 
        #       they want it, but we don't actually block for it
1094
 
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
1095
 
        #       we might just push out smaller bits at a time?
1096
1068
        self._buf.append(bytes)
1097
 
        self._buf_len += len(bytes)
1098
 
        if self._buf_len > self.BUFFER_SIZE:
 
1069
        if len(self._buf) > 100:
1099
1070
            self.flush()
1100
1071
 
1101
1072
    def flush(self):
1102
1073
        if self._buf:
1103
 
            self._real_write_func(b''.join(self._buf))
 
1074
            self._real_write_func(''.join(self._buf))
1104
1075
            del self._buf[:]
1105
 
            self._buf_len = 0
1106
1076
 
1107
1077
    def _serialise_offsets(self, offsets):
1108
1078
        """Serialise a readv offset list."""
1109
1079
        txt = []
1110
1080
        for start, length in offsets:
1111
 
            txt.append(b'%d,%d' % (start, length))
1112
 
        return b'\n'.join(txt)
 
1081
            txt.append('%d,%d' % (start, length))
 
1082
        return '\n'.join(txt)
1113
1083
 
1114
1084
    def _write_protocol_version(self):
1115
1085
        self._write_func(MESSAGE_VERSION_THREE)
1123
1093
        self._write_prefixed_bencode(headers)
1124
1094
 
1125
1095
    def _write_structure(self, args):
1126
 
        self._write_func(b's')
 
1096
        self._write_func('s')
1127
1097
        utf8_args = []
1128
1098
        for arg in args:
1129
 
            if isinstance(arg, str):
 
1099
            if type(arg) is unicode:
1130
1100
                utf8_args.append(arg.encode('utf8'))
1131
1101
            else:
1132
1102
                utf8_args.append(arg)
1133
1103
        self._write_prefixed_bencode(utf8_args)
1134
1104
 
1135
1105
    def _write_end(self):
1136
 
        self._write_func(b'e')
 
1106
        self._write_func('e')
1137
1107
        self.flush()
1138
1108
 
1139
1109
    def _write_prefixed_body(self, bytes):
1140
 
        self._write_func(b'b')
 
1110
        self._write_func('b')
1141
1111
        self._write_func(struct.pack('!L', len(bytes)))
1142
1112
        self._write_func(bytes)
1143
1113
 
1144
1114
    def _write_chunked_body_start(self):
1145
 
        self._write_func(b'oC')
 
1115
        self._write_func('oC')
1146
1116
 
1147
1117
    def _write_error_status(self):
1148
 
        self._write_func(b'oE')
 
1118
        self._write_func('oE')
1149
1119
 
1150
1120
    def _write_success_status(self):
1151
 
        self._write_func(b'oS')
 
1121
        self._write_func('oS')
1152
1122
 
1153
1123
 
1154
1124
class ProtocolThreeResponder(_ProtocolThreeEncoder):
1156
1126
    def __init__(self, write_func):
1157
1127
        _ProtocolThreeEncoder.__init__(self, write_func)
1158
1128
        self.response_sent = False
1159
 
        self._headers = {
1160
 
            b'Software version': breezy.__version__.encode('utf-8')}
1161
 
        if 'hpss' in debug.debug_flags:
1162
 
            self._thread_id = _thread.get_ident()
1163
 
            self._response_start_time = None
1164
 
 
1165
 
    def _trace(self, action, message, extra_bytes=None, include_time=False):
1166
 
        if self._response_start_time is None:
1167
 
            self._response_start_time = osutils.perf_counter()
1168
 
        if include_time:
1169
 
            t = '%5.3fs ' % (osutils.perf_counter() - self._response_start_time)
1170
 
        else:
1171
 
            t = ''
1172
 
        if extra_bytes is None:
1173
 
            extra = ''
1174
 
        else:
1175
 
            extra = ' ' + repr(extra_bytes[:40])
1176
 
            if len(extra) > 33:
1177
 
                extra = extra[:29] + extra[-1] + '...'
1178
 
        mutter('%12s: [%s] %s%s%s'
1179
 
               % (action, self._thread_id, t, message, extra))
 
1129
        self._headers = {'Software version': bzrlib.__version__}
1180
1130
 
1181
1131
    def send_error(self, exception):
1182
1132
        if self.response_sent:
1185
1135
                % (exception,))
1186
1136
        if isinstance(exception, errors.UnknownSmartMethod):
1187
1137
            failure = request.FailedSmartServerResponse(
1188
 
                (b'UnknownMethod', exception.verb))
 
1138
                ('UnknownMethod', exception.verb))
1189
1139
            self.send_response(failure)
1190
1140
            return
1191
 
        if 'hpss' in debug.debug_flags:
1192
 
            self._trace('error', str(exception))
1193
1141
        self.response_sent = True
1194
1142
        self._write_protocol_version()
1195
1143
        self._write_headers(self._headers)
1196
1144
        self._write_error_status()
1197
 
        self._write_structure(
1198
 
            (b'error', str(exception).encode('utf-8', 'replace')))
 
1145
        self._write_structure(('error', str(exception)))
1199
1146
        self._write_end()
1200
1147
 
1201
1148
    def send_response(self, response):
1210
1157
            self._write_success_status()
1211
1158
        else:
1212
1159
            self._write_error_status()
1213
 
        if 'hpss' in debug.debug_flags:
1214
 
            self._trace('response', repr(response.args))
1215
1160
        self._write_structure(response.args)
1216
1161
        if response.body is not None:
1217
1162
            self._write_prefixed_body(response.body)
1218
 
            if 'hpss' in debug.debug_flags:
1219
 
                self._trace('body', '%d bytes' % (len(response.body),),
1220
 
                            response.body, include_time=True)
1221
1163
        elif response.body_stream is not None:
1222
 
            count = num_bytes = 0
1223
 
            first_chunk = None
1224
1164
            for exc_info, chunk in _iter_with_errors(response.body_stream):
1225
 
                count += 1
1226
1165
                if exc_info is not None:
1227
1166
                    self._write_error_status()
1228
1167
                    error_struct = request._translate_error(exc_info[1])
1233
1172
                        self._write_error_status()
1234
1173
                        self._write_structure(chunk.args)
1235
1174
                        break
1236
 
                    num_bytes += len(chunk)
1237
 
                    if first_chunk is None:
1238
 
                        first_chunk = chunk
1239
1175
                    self._write_prefixed_body(chunk)
1240
 
                    self.flush()
1241
 
                    if 'hpssdetail' in debug.debug_flags:
1242
 
                        # Not worth timing separately, as _write_func is
1243
 
                        # actually buffered
1244
 
                        self._trace('body chunk',
1245
 
                                    '%d bytes' % (len(chunk),),
1246
 
                                    chunk, suppress_time=True)
1247
 
            if 'hpss' in debug.debug_flags:
1248
 
                self._trace('body stream',
1249
 
                            '%d bytes %d chunks' % (num_bytes, count),
1250
 
                            first_chunk)
1251
1176
        self._write_end()
1252
 
        if 'hpss' in debug.debug_flags:
1253
 
            self._trace('response end', '', include_time=True)
1254
1177
 
1255
1178
 
1256
1179
def _iter_with_errors(iterable):
1280
1203
    iterator = iter(iterable)
1281
1204
    while True:
1282
1205
        try:
1283
 
            yield None, next(iterator)
 
1206
            yield None, iterator.next()
1284
1207
        except StopIteration:
1285
1208
            return
1286
1209
        except (KeyboardInterrupt, SystemExit):
1298
1221
        _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1299
1222
        self._medium_request = medium_request
1300
1223
        self._headers = {}
1301
 
        self.body_stream_started = None
1302
1224
 
1303
1225
    def set_headers(self, headers):
1304
1226
        self._headers = headers.copy()
1309
1231
            base = getattr(self._medium_request._medium, 'base', None)
1310
1232
            if base is not None:
1311
1233
                mutter('             (to %s)', base)
1312
 
            self._request_start_time = osutils.perf_counter()
 
1234
            self._request_start_time = time.time()
1313
1235
        self._write_protocol_version()
1314
1236
        self._write_headers(self._headers)
1315
1237
        self._write_structure(args)
1327
1249
            if path is not None:
1328
1250
                mutter('                  (to %s)', path)
1329
1251
            mutter('              %d bytes', len(body))
1330
 
            self._request_start_time = osutils.perf_counter()
 
1252
            self._request_start_time = time.time()
1331
1253
        self._write_protocol_version()
1332
1254
        self._write_headers(self._headers)
1333
1255
        self._write_structure(args)
1339
1261
        """Make a remote call with a readv array.
1340
1262
 
1341
1263
        The body is encoded with one line per readv offset pair. The numbers in
1342
 
        each pair are separated by a comma, and no trailing \\n is emitted.
 
1264
        each pair are separated by a comma, and no trailing \n is emitted.
1343
1265
        """
1344
1266
        if 'hpss' in debug.debug_flags:
1345
1267
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
1346
1268
            path = getattr(self._medium_request._medium, '_path', None)
1347
1269
            if path is not None:
1348
1270
                mutter('                  (to %s)', path)
1349
 
            self._request_start_time = osutils.perf_counter()
 
1271
            self._request_start_time = time.time()
1350
1272
        self._write_protocol_version()
1351
1273
        self._write_headers(self._headers)
1352
1274
        self._write_structure(args)
1363
1285
            path = getattr(self._medium_request._medium, '_path', None)
1364
1286
            if path is not None:
1365
1287
                mutter('                  (to %s)', path)
1366
 
            self._request_start_time = osutils.perf_counter()
1367
 
        self.body_stream_started = False
 
1288
            self._request_start_time = time.time()
1368
1289
        self._write_protocol_version()
1369
1290
        self._write_headers(self._headers)
1370
1291
        self._write_structure(args)
1372
1293
        #       have finished sending the stream.  We would notice at the end
1373
1294
        #       anyway, but if the medium can deliver it early then it's good
1374
1295
        #       to short-circuit the whole request...
1375
 
        # Provoke any ConnectionReset failures before we start the body stream.
1376
 
        self.flush()
1377
 
        self.body_stream_started = True
1378
1296
        for exc_info, part in _iter_with_errors(stream):
1379
1297
            if exc_info is not None:
1380
1298
                # Iterating the stream failed.  Cleanly abort the request.
1381
1299
                self._write_error_status()
1382
1300
                # Currently the client unconditionally sends ('error',) as the
1383
1301
                # error args.
1384
 
                self._write_structure((b'error',))
 
1302
                self._write_structure(('error',))
1385
1303
                self._write_end()
1386
1304
                self._medium_request.finished_writing()
1387
 
                (exc_type, exc_val, exc_tb) = exc_info
1388
 
                try:
1389
 
                    raise exc_val
1390
 
                finally:
1391
 
                    del exc_info
 
1305
                raise exc_info[0], exc_info[1], exc_info[2]
1392
1306
            else:
1393
1307
                self._write_prefixed_body(part)
1394
1308
                self.flush()
1395
1309
        self._write_end()
1396
1310
        self._medium_request.finished_writing()
 
1311