22
21
from cStringIO import StringIO
24
from bzrlib import debug
24
25
from bzrlib import errors
25
26
from bzrlib.smart import request
27
from bzrlib.trace import log_exception_quietly, mutter
30
# Protocol version strings. These are sent as prefixes of bzr requests and
31
# responses to identify the protocol version being used. (There are no version
32
# one strings because that version doesn't send any).
33
REQUEST_VERSION_TWO = 'bzr request 2\n'
34
RESPONSE_VERSION_TWO = 'bzr response 2\n'
28
37
def _recv_tuple(from_file):
97
106
self.excess_buffer = self.in_buffer
98
107
self.in_buffer = ''
99
self._send_response(self.request.response.args,
100
self.request.response.body)
108
self._send_response(self.request.response)
101
109
except KeyboardInterrupt:
103
111
except Exception, exception:
104
112
# everything else: pass to client, flush, and quit
105
self._send_response(('error', str(exception)))
113
log_exception_quietly()
114
self._send_response(request.FailedSmartServerResponse(
115
('error', str(exception))))
108
118
if self.has_dispatched:
123
133
assert self.request.finished_reading, \
124
134
"no more body, request not finished"
125
135
if self.request.response is not None:
126
self._send_response(self.request.response.args,
127
self.request.response.body)
136
self._send_response(self.request.response)
128
137
self.excess_buffer = self.in_buffer
129
138
self.in_buffer = ''
131
140
assert not self.request.finished_reading, \
132
141
"no response and we have finished reading."
134
def _send_response(self, args, body=None):
143
def _send_response(self, response):
135
144
"""Send a smart server response down the output stream."""
136
145
assert not self._finished, 'response already sent'
137
148
self._finished = True
149
self._write_protocol_version()
150
self._write_success_or_failure_prefix(response)
138
151
self._write_func(_encode_tuple(args))
139
152
if body is not None:
140
153
assert isinstance(body, str), 'body must be a str'
141
154
bytes = self._encode_bulk_data(body)
142
155
self._write_func(bytes)
157
def _write_protocol_version(self):
158
"""Write any prefixes this protocol requires.
160
Version one doesn't send protocol versions.
163
def _write_success_or_failure_prefix(self, response):
164
"""Write the protocol specific success/failure prefix.
166
For SmartServerRequestProtocolOne this is omitted but we
167
call is_successful to ensure that the response is valid.
169
response.is_successful()
144
171
def next_read_size(self):
145
172
if self._finished:
150
177
return self._body_decoder.next_read_size()
180
class SmartServerRequestProtocolTwo(SmartServerRequestProtocolOne):
181
r"""Version two of the server side of the smart protocol.
183
This prefixes responses with the value of RESPONSE_VERSION_TWO.
186
def _write_success_or_failure_prefix(self, response):
187
"""Write the protocol specific success/failure prefix."""
188
if response.is_successful():
189
self._write_func('success\n')
191
self._write_func('failed\n')
193
def _write_protocol_version(self):
194
r"""Write any prefixes this protocol requires.
196
Version two sends the value of RESPONSE_VERSION_TWO.
198
self._write_func(RESPONSE_VERSION_TWO)
153
201
class LengthPrefixedBodyDecoder(object):
154
202
"""Decodes the length-prefixed bulk data."""
253
301
self._request = request
254
302
self._body_buffer = None
303
self._request_start_time = None
256
305
def call(self, *args):
257
bytes = _encode_tuple(args)
258
self._request.accept_bytes(bytes)
306
if 'hpss' in debug.debug_flags:
307
mutter('hpss call: %s', repr(args)[1:-1])
308
self._request_start_time = time.time()
309
self._write_args(args)
259
310
self._request.finished_writing()
261
312
def call_with_body_bytes(self, args, body):
264
315
After calling this, call read_response_tuple to find the result out.
266
bytes = _encode_tuple(args)
267
self._request.accept_bytes(bytes)
317
if 'hpss' in debug.debug_flags:
318
mutter('hpss call w/body: %s (%r...)', repr(args)[1:-1], body[:20])
319
mutter(' %d bytes', len(body))
320
self._request_start_time = time.time()
321
self._write_args(args)
268
322
bytes = self._encode_bulk_data(body)
269
323
self._request.accept_bytes(bytes)
270
324
self._request.finished_writing()
275
329
The body is encoded with one line per readv offset pair. The numbers in
276
330
each pair are separated by a comma, and no trailing \n is emitted.
278
bytes = _encode_tuple(args)
279
self._request.accept_bytes(bytes)
332
if 'hpss' in debug.debug_flags:
333
mutter('hpss call w/readv: %s', repr(args)[1:-1])
334
self._request_start_time = time.time()
335
self._write_args(args)
280
336
readv_bytes = self._serialise_offsets(body)
281
337
bytes = self._encode_bulk_data(readv_bytes)
282
338
self._request.accept_bytes(bytes)
283
339
self._request.finished_writing()
340
if 'hpss' in debug.debug_flags:
341
mutter(' %d bytes in readv request', len(readv_bytes))
285
343
def cancel_read_body(self):
286
344
"""After expecting a body, a response code may indicate one otherwise.
297
355
This should only be called once.
299
357
result = self._recv_tuple()
358
if 'hpss' in debug.debug_flags:
359
if self._request_start_time is not None:
360
mutter(' result: %6.3fs %s',
361
time.time() - self._request_start_time,
363
self._request_start_time = None
365
mutter(' result: %s', repr(result)[1:-1])
300
366
if not expect_body:
301
367
self._request.finished_reading()
318
384
self._request.finished_reading()
319
385
self._body_buffer = StringIO(_body_decoder.read_pending_data())
320
386
# XXX: TODO check the trailer result.
387
if 'hpss' in debug.debug_flags:
388
mutter(' %d body bytes read',
389
len(self._body_buffer.getvalue()))
321
390
return self._body_buffer.read(count)
323
392
def _recv_tuple(self):
324
393
"""Receive a tuple from the medium request."""
394
return _decode_tuple(self._recv_line())
396
def _recv_line(self):
397
"""Read an entire line from the medium request."""
326
399
while not line or line[-1] != '\n':
327
400
# TODO: this is inefficient - but tuples are short.
328
401
new_char = self._request.read_bytes(1)
330
403
assert new_char != '', "end of file reading from server."
331
return _decode_tuple(line)
333
406
def query_version(self):
334
407
"""Return protocol version number of the server."""
336
409
resp = self.read_response_tuple()
337
410
if resp == ('ok', '1'):
412
elif resp == ('ok', '2'):
340
415
raise errors.SmartProtocolError("bad response %r" % (resp,))
417
def _write_args(self, args):
418
self._write_protocol_version()
419
bytes = _encode_tuple(args)
420
self._request.accept_bytes(bytes)
422
def _write_protocol_version(self):
423
"""Write any prefixes this protocol requires.
425
Version one doesn't send protocol versions.
429
class SmartClientRequestProtocolTwo(SmartClientRequestProtocolOne):
430
"""Version two of the client side of the smart protocol.
432
This prefixes the request with the value of REQUEST_VERSION_TWO.
435
def read_response_tuple(self, expect_body=False):
436
"""Read a response tuple from the wire.
438
This should only be called once.
440
version = self._request.read_line()
441
if version != RESPONSE_VERSION_TWO:
442
raise errors.SmartProtocolError('bad protocol marker %r' % version)
443
response_status = self._recv_line()
444
if response_status not in ('success\n', 'failed\n'):
445
raise errors.SmartProtocolError(
446
'bad protocol status %r' % response_status)
447
self.response_status = response_status == 'success\n'
448
return SmartClientRequestProtocolOne.read_response_tuple(self, expect_body)
450
def _write_protocol_version(self):
451
r"""Write any prefixes this protocol requires.
453
Version two sends the value of REQUEST_VERSION_TWO.
455
self._request.accept_bytes(REQUEST_VERSION_TWO)