/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Arbash Meinel
  • Date: 2007-07-31 21:20:02 UTC
  • mto: This revision was merged to the branch mainline in revision 2688.
  • Revision ID: john@arbash-meinel.com-20070731212002-tq2bht51jbfedjlk
Stop tracking medium bytes if we aren't going to use it

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Wire-level encoding and decoding of requests and responses for the smart
 
18
client and server.
 
19
"""
 
20
 
 
21
from cStringIO import StringIO
 
22
import time
 
23
 
 
24
from bzrlib import debug
 
25
from bzrlib import errors
 
26
from bzrlib.smart import request
 
27
from bzrlib.trace import mutter
 
28
 
 
29
 
 
30
# Protocol version strings.  These are sent as prefixes of bzr requests and
 
31
# responses to identify the protocol version being used. (There are no version
 
32
# one strings because that version doesn't send any).
 
33
REQUEST_VERSION_TWO = 'bzr request 2\n'
 
34
RESPONSE_VERSION_TWO = 'bzr response 2\n'
 
35
 
 
36
 
 
37
def _recv_tuple(from_file):
 
38
    req_line = from_file.readline()
 
39
    return _decode_tuple(req_line)
 
40
 
 
41
 
 
42
def _decode_tuple(req_line):
 
43
    if req_line == None or req_line == '':
 
44
        return None
 
45
    if req_line[-1] != '\n':
 
46
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
47
    return tuple(req_line[:-1].split('\x01'))
 
48
 
 
49
 
 
50
def _encode_tuple(args):
 
51
    """Encode the tuple args to a bytestream."""
 
52
    return '\x01'.join(args) + '\n'
 
53
 
 
54
 
 
55
class SmartProtocolBase(object):
 
56
    """Methods common to client and server"""
 
57
 
 
58
    # TODO: this only actually accomodates a single block; possibly should
 
59
    # support multiple chunks?
 
60
    def _encode_bulk_data(self, body):
 
61
        """Encode body as a bulk data chunk."""
 
62
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
63
 
 
64
    def _serialise_offsets(self, offsets):
 
65
        """Serialise a readv offset list."""
 
66
        txt = []
 
67
        for start, length in offsets:
 
68
            txt.append('%d,%d' % (start, length))
 
69
        return '\n'.join(txt)
 
70
        
 
71
 
 
72
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
73
    """Server-side encoding and decoding logic for smart version 1."""
 
74
    
 
75
    def __init__(self, backing_transport, write_func):
 
76
        self._backing_transport = backing_transport
 
77
        self.excess_buffer = ''
 
78
        self._finished = False
 
79
        self.in_buffer = ''
 
80
        self.has_dispatched = False
 
81
        self.request = None
 
82
        self._body_decoder = None
 
83
 
 
84
    def accept_bytes(self, bytes):
 
85
        """Take bytes, and advance the internal state machine appropriately.
 
86
        
 
87
        :param bytes: must be a byte string
 
88
        """
 
89
        assert isinstance(bytes, str)
 
90
        self.in_buffer += bytes
 
91
        if not self.has_dispatched:
 
92
            if '\n' not in self.in_buffer:
 
93
                # no command line yet
 
94
                return
 
95
            self.has_dispatched = True
 
96
            try:
 
97
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
98
                first_line += '\n'
 
99
                req_args = _decode_tuple(first_line)
 
100
                self.request = request.SmartServerRequestHandler(
 
101
                    self._backing_transport, commands=request.request_handlers)
 
102
                self.request.dispatch_command(req_args[0], req_args[1:])
 
103
                if self.request.finished_reading:
 
104
                    # trivial request
 
105
                    self.excess_buffer = self.in_buffer
 
106
                    self.in_buffer = ''
 
107
                    self._send_response(self.request.response)
 
108
            except KeyboardInterrupt:
 
109
                raise
 
110
            except Exception, exception:
 
111
                # everything else: pass to client, flush, and quit
 
112
                self._send_response(request.FailedSmartServerResponse(
 
113
                    ('error', str(exception))))
 
114
                return
 
115
 
 
116
        if self.has_dispatched:
 
117
            if self._finished:
 
118
                # nothing to do.XXX: this routine should be a single state 
 
119
                # machine too.
 
120
                self.excess_buffer += self.in_buffer
 
121
                self.in_buffer = ''
 
122
                return
 
123
            if self._body_decoder is None:
 
124
                self._body_decoder = LengthPrefixedBodyDecoder()
 
125
            self._body_decoder.accept_bytes(self.in_buffer)
 
126
            self.in_buffer = self._body_decoder.unused_data
 
127
            body_data = self._body_decoder.read_pending_data()
 
128
            self.request.accept_body(body_data)
 
129
            if self._body_decoder.finished_reading:
 
130
                self.request.end_of_body()
 
131
                assert self.request.finished_reading, \
 
132
                    "no more body, request not finished"
 
133
            if self.request.response is not None:
 
134
                self._send_response(self.request.response)
 
135
                self.excess_buffer = self.in_buffer
 
136
                self.in_buffer = ''
 
137
            else:
 
138
                assert not self.request.finished_reading, \
 
139
                    "no response and we have finished reading."
 
140
 
 
141
    def _send_response(self, response):
 
142
        """Send a smart server response down the output stream."""
 
143
        assert not self._finished, 'response already sent'
 
144
        args = response.args
 
145
        body = response.body
 
146
        self._finished = True
 
147
        self._write_protocol_version()
 
148
        self._write_success_or_failure_prefix(response)
 
149
        self._write_func(_encode_tuple(args))
 
150
        if body is not None:
 
151
            assert isinstance(body, str), 'body must be a str'
 
152
            bytes = self._encode_bulk_data(body)
 
153
            self._write_func(bytes)
 
154
 
 
155
    def _write_protocol_version(self):
 
156
        """Write any prefixes this protocol requires.
 
157
        
 
158
        Version one doesn't send protocol versions.
 
159
        """
 
160
 
 
161
    def _write_success_or_failure_prefix(self, response):
 
162
        """Write the protocol specific success/failure prefix.
 
163
 
 
164
        For SmartServerRequestProtocolOne this is omitted but we
 
165
        call is_successful to ensure that the response is valid.
 
166
        """
 
167
        response.is_successful()
 
168
 
 
169
    def next_read_size(self):
 
170
        if self._finished:
 
171
            return 0
 
172
        if self._body_decoder is None:
 
173
            return 1
 
174
        else:
 
175
            return self._body_decoder.next_read_size()
 
176
 
 
177
 
 
178
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
 
179
    r"""Version two of the server side of the smart protocol.
 
180
   
 
181
    This prefixes responses with the value of RESPONSE_VERSION_TWO.
 
182
    """
 
183
 
 
184
    def _write_success_or_failure_prefix(self, response):
 
185
        """Write the protocol specific success/failure prefix."""
 
186
        if response.is_successful():
 
187
            self._write_func('success\n')
 
188
        else:
 
189
            self._write_func('failed\n')
 
190
 
 
191
    def _write_protocol_version(self):
 
192
        r"""Write any prefixes this protocol requires.
 
193
        
 
194
        Version two sends the value of RESPONSE_VERSION_TWO.
 
195
        """
 
196
        self._write_func(RESPONSE_VERSION_TWO)
 
197
 
 
198
 
 
199
class LengthPrefixedBodyDecoder(object):
 
200
    """Decodes the length-prefixed bulk data."""
 
201
    
 
202
    def __init__(self):
 
203
        self.bytes_left = None
 
204
        self.finished_reading = False
 
205
        self.unused_data = ''
 
206
        self.state_accept = self._state_accept_expecting_length
 
207
        self.state_read = self._state_read_no_data
 
208
        self._in_buffer = ''
 
209
        self._trailer_buffer = ''
 
210
    
 
211
    def accept_bytes(self, bytes):
 
212
        """Decode as much of bytes as possible.
 
213
 
 
214
        If 'bytes' contains too much data it will be appended to
 
215
        self.unused_data.
 
216
 
 
217
        finished_reading will be set when no more data is required.  Further
 
218
        data will be appended to self.unused_data.
 
219
        """
 
220
        # accept_bytes is allowed to change the state
 
221
        current_state = self.state_accept
 
222
        self.state_accept(bytes)
 
223
        while current_state != self.state_accept:
 
224
            current_state = self.state_accept
 
225
            self.state_accept('')
 
226
 
 
227
    def next_read_size(self):
 
228
        if self.bytes_left is not None:
 
229
            # Ideally we want to read all the remainder of the body and the
 
230
            # trailer in one go.
 
231
            return self.bytes_left + 5
 
232
        elif self.state_accept == self._state_accept_reading_trailer:
 
233
            # Just the trailer left
 
234
            return 5 - len(self._trailer_buffer)
 
235
        elif self.state_accept == self._state_accept_expecting_length:
 
236
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
237
            # 'done\n').
 
238
            return 6
 
239
        else:
 
240
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
241
            return 1
 
242
        
 
243
    def read_pending_data(self):
 
244
        """Return any pending data that has been decoded."""
 
245
        return self.state_read()
 
246
 
 
247
    def _state_accept_expecting_length(self, bytes):
 
248
        self._in_buffer += bytes
 
249
        pos = self._in_buffer.find('\n')
 
250
        if pos == -1:
 
251
            return
 
252
        self.bytes_left = int(self._in_buffer[:pos])
 
253
        self._in_buffer = self._in_buffer[pos+1:]
 
254
        self.bytes_left -= len(self._in_buffer)
 
255
        self.state_accept = self._state_accept_reading_body
 
256
        self.state_read = self._state_read_in_buffer
 
257
 
 
258
    def _state_accept_reading_body(self, bytes):
 
259
        self._in_buffer += bytes
 
260
        self.bytes_left -= len(bytes)
 
261
        if self.bytes_left <= 0:
 
262
            # Finished with body
 
263
            if self.bytes_left != 0:
 
264
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
265
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
266
            self.bytes_left = None
 
267
            self.state_accept = self._state_accept_reading_trailer
 
268
        
 
269
    def _state_accept_reading_trailer(self, bytes):
 
270
        self._trailer_buffer += bytes
 
271
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
272
        # a ProtocolViolation exception?
 
273
        if self._trailer_buffer.startswith('done\n'):
 
274
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
275
            self.state_accept = self._state_accept_reading_unused
 
276
            self.finished_reading = True
 
277
    
 
278
    def _state_accept_reading_unused(self, bytes):
 
279
        self.unused_data += bytes
 
280
 
 
281
    def _state_read_no_data(self):
 
282
        return ''
 
283
 
 
284
    def _state_read_in_buffer(self):
 
285
        result = self._in_buffer
 
286
        self._in_buffer = ''
 
287
        return result
 
288
 
 
289
 
 
290
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
291
    """The client-side protocol for smart version 1."""
 
292
 
 
293
    def __init__(self, request):
 
294
        """Construct a SmartClientRequestProtocolOne.
 
295
 
 
296
        :param request: A SmartClientMediumRequest to serialise onto and
 
297
            deserialise from.
 
298
        """
 
299
        self._request = request
 
300
        self._body_buffer = None
 
301
        self._request_start_time = None
 
302
 
 
303
    def call(self, *args):
 
304
        if 'hpss' in debug.debug_flags:
 
305
            mutter('hpss call:   %s', repr(args)[1:-1])
 
306
            self._request_start_time = time.time()
 
307
        self._write_args(args)
 
308
        self._request.finished_writing()
 
309
 
 
310
    def call_with_body_bytes(self, args, body):
 
311
        """Make a remote call of args with body bytes 'body'.
 
312
 
 
313
        After calling this, call read_response_tuple to find the result out.
 
314
        """
 
315
        if 'hpss' in debug.debug_flags:
 
316
            mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
 
317
            mutter('              %d bytes', len(body))
 
318
            self._request_start_time = time.time()
 
319
        self._write_args(args)
 
320
        bytes = self._encode_bulk_data(body)
 
321
        self._request.accept_bytes(bytes)
 
322
        self._request.finished_writing()
 
323
 
 
324
    def call_with_body_readv_array(self, args, body):
 
325
        """Make a remote call with a readv array.
 
326
 
 
327
        The body is encoded with one line per readv offset pair. The numbers in
 
328
        each pair are separated by a comma, and no trailing \n is emitted.
 
329
        """
 
330
        if 'hpss' in debug.debug_flags:
 
331
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
 
332
            self._request_start_time = time.time()
 
333
        self._write_args(args)
 
334
        readv_bytes = self._serialise_offsets(body)
 
335
        bytes = self._encode_bulk_data(readv_bytes)
 
336
        self._request.accept_bytes(bytes)
 
337
        self._request.finished_writing()
 
338
        if 'hpss' in debug.debug_flags:
 
339
            mutter('              %d bytes in readv request', len(readv_bytes))
 
340
 
 
341
    def cancel_read_body(self):
 
342
        """After expecting a body, a response code may indicate one otherwise.
 
343
 
 
344
        This method lets the domain client inform the protocol that no body
 
345
        will be transmitted. This is a terminal method: after calling it the
 
346
        protocol is not able to be used further.
 
347
        """
 
348
        self._request.finished_reading()
 
349
 
 
350
    def read_response_tuple(self, expect_body=False):
 
351
        """Read a response tuple from the wire.
 
352
 
 
353
        This should only be called once.
 
354
        """
 
355
        result = self._recv_tuple()
 
356
        if 'hpss' in debug.debug_flags:
 
357
            if self._request_start_time is not None:
 
358
                mutter('   result:   %6.3fs  %s',
 
359
                       time.time() - self._request_start_time,
 
360
                       repr(result)[1:-1])
 
361
                self._request_start_time = None
 
362
            else:
 
363
                mutter('   result:   %s', repr(result)[1:-1])
 
364
        if not expect_body:
 
365
            self._request.finished_reading()
 
366
        return result
 
367
 
 
368
    def read_body_bytes(self, count=-1):
 
369
        """Read bytes from the body, decoding into a byte stream.
 
370
        
 
371
        We read all bytes at once to ensure we've checked the trailer for 
 
372
        errors, and then feed the buffer back as read_body_bytes is called.
 
373
        """
 
374
        if self._body_buffer is not None:
 
375
            return self._body_buffer.read(count)
 
376
        _body_decoder = LengthPrefixedBodyDecoder()
 
377
 
 
378
        while not _body_decoder.finished_reading:
 
379
            bytes_wanted = _body_decoder.next_read_size()
 
380
            bytes = self._request.read_bytes(bytes_wanted)
 
381
            _body_decoder.accept_bytes(bytes)
 
382
        self._request.finished_reading()
 
383
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
384
        # XXX: TODO check the trailer result.
 
385
        if 'hpss' in debug.debug_flags:
 
386
            mutter('              %d body bytes read',
 
387
                   len(self._body_buffer.getvalue()))
 
388
        return self._body_buffer.read(count)
 
389
 
 
390
    def _recv_tuple(self):
 
391
        """Receive a tuple from the medium request."""
 
392
        return _decode_tuple(self._recv_line())
 
393
 
 
394
    def _recv_line(self):
 
395
        """Read an entire line from the medium request."""
 
396
        line = ''
 
397
        while not line or line[-1] != '\n':
 
398
            # TODO: this is inefficient - but tuples are short.
 
399
            new_char = self._request.read_bytes(1)
 
400
            line += new_char
 
401
            assert new_char != '', "end of file reading from server."
 
402
        return line
 
403
 
 
404
    def query_version(self):
 
405
        """Return protocol version number of the server."""
 
406
        self.call('hello')
 
407
        resp = self.read_response_tuple()
 
408
        if resp == ('ok', '1'):
 
409
            return 1
 
410
        elif resp == ('ok', '2'):
 
411
            return 2
 
412
        else:
 
413
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
414
 
 
415
    def _write_args(self, args):
 
416
        self._write_protocol_version()
 
417
        bytes = _encode_tuple(args)
 
418
        self._request.accept_bytes(bytes)
 
419
 
 
420
    def _write_protocol_version(self):
 
421
        """Write any prefixes this protocol requires.
 
422
        
 
423
        Version one doesn't send protocol versions.
 
424
        """
 
425
 
 
426
 
 
427
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
 
428
    """Version two of the client side of the smart protocol.
 
429
    
 
430
    This prefixes the request with the value of REQUEST_VERSION_TWO.
 
431
    """
 
432
 
 
433
    def read_response_tuple(self, expect_body=False):
 
434
        """Read a response tuple from the wire.
 
435
 
 
436
        This should only be called once.
 
437
        """
 
438
        version = self._request.read_line()
 
439
        if version != RESPONSE_VERSION_TWO:
 
440
            raise errors.SmartProtocolError('bad protocol marker %r' % version)
 
441
        response_status = self._recv_line()
 
442
        if response_status not in ('success\n', 'failed\n'):
 
443
            raise errors.SmartProtocolError(
 
444
                'bad protocol status %r' % response_status)
 
445
        self.response_status = response_status == 'success\n'
 
446
        return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
 
447
 
 
448
    def _write_protocol_version(self):
 
449
        r"""Write any prefixes this protocol requires.
 
450
        
 
451
        Version two sends the value of REQUEST_VERSION_TWO.
 
452
        """
 
453
        self._request.accept_bytes(REQUEST_VERSION_TWO)
 
454