3
from cStringIO import StringIO
5
from bzrlib import errors
6
from bzrlib.smart import request
9
def _recv_tuple(from_file):
10
req_line = from_file.readline()
11
return _decode_tuple(req_line)
14
def _decode_tuple(req_line):
15
if req_line == None or req_line == '':
17
if req_line[-1] != '\n':
18
raise errors.SmartProtocolError("request %r not terminated" % req_line)
19
return tuple(req_line[:-1].split('\x01'))
22
def _encode_tuple(args):
23
"""Encode the tuple args to a bytestream."""
24
return '\x01'.join(args) + '\n'
27
class SmartProtocolBase(object):
28
"""Methods common to client and server"""
30
# TODO: this only actually accomodates a single block; possibly should
31
# support multiple chunks?
32
def _encode_bulk_data(self, body):
33
"""Encode body as a bulk data chunk."""
34
return ''.join(('%d\n' % len(body), body, 'done\n'))
36
def _serialise_offsets(self, offsets):
37
"""Serialise a readv offset list."""
39
for start, length in offsets:
40
txt.append('%d,%d' % (start, length))
44
class SmartServerRequestProtocolOne(SmartProtocolBase):
45
"""Server-side encoding and decoding logic for smart version 1."""
47
def __init__(self, backing_transport, write_func):
48
self._backing_transport = backing_transport
49
self.excess_buffer = ''
50
self._finished = False
52
self.has_dispatched = False
54
self._body_decoder = None
55
self._write_func = write_func
57
def accept_bytes(self, bytes):
58
"""Take bytes, and advance the internal state machine appropriately.
60
:param bytes: must be a byte string
62
assert isinstance(bytes, str)
63
self.in_buffer += bytes
64
if not self.has_dispatched:
65
if '\n' not in self.in_buffer:
68
self.has_dispatched = True
70
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
72
req_args = _decode_tuple(first_line)
73
self.request = request.SmartServerRequestHandler(
74
self._backing_transport)
75
self.request.dispatch_command(req_args[0], req_args[1:])
76
if self.request.finished_reading:
78
self.excess_buffer = self.in_buffer
80
self._send_response(self.request.response.args,
81
self.request.response.body)
82
except KeyboardInterrupt:
84
except Exception, exception:
85
# everything else: pass to client, flush, and quit
86
self._send_response(('error', str(exception)))
89
if self.has_dispatched:
91
# nothing to do.XXX: this routine should be a single state
93
self.excess_buffer += self.in_buffer
96
if self._body_decoder is None:
97
self._body_decoder = LengthPrefixedBodyDecoder()
98
self._body_decoder.accept_bytes(self.in_buffer)
99
self.in_buffer = self._body_decoder.unused_data
100
body_data = self._body_decoder.read_pending_data()
101
self.request.accept_body(body_data)
102
if self._body_decoder.finished_reading:
103
self.request.end_of_body()
104
assert self.request.finished_reading, \
105
"no more body, request not finished"
106
if self.request.response is not None:
107
self._send_response(self.request.response.args,
108
self.request.response.body)
109
self.excess_buffer = self.in_buffer
112
assert not self.request.finished_reading, \
113
"no response and we have finished reading."
115
def _send_response(self, args, body=None):
116
"""Send a smart server response down the output stream."""
117
assert not self._finished, 'response already sent'
118
self._finished = True
119
self._write_func(_encode_tuple(args))
121
assert isinstance(body, str), 'body must be a str'
122
bytes = self._encode_bulk_data(body)
123
self._write_func(bytes)
125
def next_read_size(self):
128
if self._body_decoder is None:
131
return self._body_decoder.next_read_size()
134
class LengthPrefixedBodyDecoder(object):
135
"""Decodes the length-prefixed bulk data."""
138
self.bytes_left = None
139
self.finished_reading = False
140
self.unused_data = ''
141
self.state_accept = self._state_accept_expecting_length
142
self.state_read = self._state_read_no_data
144
self._trailer_buffer = ''
146
def accept_bytes(self, bytes):
147
"""Decode as much of bytes as possible.
149
If 'bytes' contains too much data it will be appended to
152
finished_reading will be set when no more data is required. Further
153
data will be appended to self.unused_data.
155
# accept_bytes is allowed to change the state
156
current_state = self.state_accept
157
self.state_accept(bytes)
158
while current_state != self.state_accept:
159
current_state = self.state_accept
160
self.state_accept('')
162
def next_read_size(self):
163
if self.bytes_left is not None:
164
# Ideally we want to read all the remainder of the body and the
166
return self.bytes_left + 5
167
elif self.state_accept == self._state_accept_reading_trailer:
168
# Just the trailer left
169
return 5 - len(self._trailer_buffer)
170
elif self.state_accept == self._state_accept_expecting_length:
171
# There's still at least 6 bytes left ('\n' to end the length, plus
175
# Reading excess data. Either way, 1 byte at a time is fine.
178
def read_pending_data(self):
179
"""Return any pending data that has been decoded."""
180
return self.state_read()
182
def _state_accept_expecting_length(self, bytes):
183
self._in_buffer += bytes
184
pos = self._in_buffer.find('\n')
187
self.bytes_left = int(self._in_buffer[:pos])
188
self._in_buffer = self._in_buffer[pos+1:]
189
self.bytes_left -= len(self._in_buffer)
190
self.state_accept = self._state_accept_reading_body
191
self.state_read = self._state_read_in_buffer
193
def _state_accept_reading_body(self, bytes):
194
self._in_buffer += bytes
195
self.bytes_left -= len(bytes)
196
if self.bytes_left <= 0:
198
if self.bytes_left != 0:
199
self._trailer_buffer = self._in_buffer[self.bytes_left:]
200
self._in_buffer = self._in_buffer[:self.bytes_left]
201
self.bytes_left = None
202
self.state_accept = self._state_accept_reading_trailer
204
def _state_accept_reading_trailer(self, bytes):
205
self._trailer_buffer += bytes
206
# TODO: what if the trailer does not match "done\n"? Should this raise
207
# a ProtocolViolation exception?
208
if self._trailer_buffer.startswith('done\n'):
209
self.unused_data = self._trailer_buffer[len('done\n'):]
210
self.state_accept = self._state_accept_reading_unused
211
self.finished_reading = True
213
def _state_accept_reading_unused(self, bytes):
214
self.unused_data += bytes
216
def _state_read_no_data(self):
219
def _state_read_in_buffer(self):
220
result = self._in_buffer
225
class SmartClientRequestProtocolOne(SmartProtocolBase):
226
"""The client-side protocol for smart version 1."""
228
def __init__(self, request):
229
"""Construct a SmartClientRequestProtocolOne.
231
:param request: A SmartClientMediumRequest to serialise onto and
234
self._request = request
235
self._body_buffer = None
237
def call(self, *args):
238
bytes = _encode_tuple(args)
239
self._request.accept_bytes(bytes)
240
self._request.finished_writing()
242
def call_with_body_bytes(self, args, body):
243
"""Make a remote call of args with body bytes 'body'.
245
After calling this, call read_response_tuple to find the result out.
247
bytes = _encode_tuple(args)
248
self._request.accept_bytes(bytes)
249
bytes = self._encode_bulk_data(body)
250
self._request.accept_bytes(bytes)
251
self._request.finished_writing()
253
def call_with_body_readv_array(self, args, body):
254
"""Make a remote call with a readv array.
256
The body is encoded with one line per readv offset pair. The numbers in
257
each pair are separated by a comma, and no trailing \n is emitted.
259
bytes = _encode_tuple(args)
260
self._request.accept_bytes(bytes)
261
readv_bytes = self._serialise_offsets(body)
262
bytes = self._encode_bulk_data(readv_bytes)
263
self._request.accept_bytes(bytes)
264
self._request.finished_writing()
266
def cancel_read_body(self):
267
"""After expecting a body, a response code may indicate one otherwise.
269
This method lets the domain client inform the protocol that no body
270
will be transmitted. This is a terminal method: after calling it the
271
protocol is not able to be used further.
273
self._request.finished_reading()
275
def read_response_tuple(self, expect_body=False):
276
"""Read a response tuple from the wire.
278
This should only be called once.
280
result = self._recv_tuple()
282
self._request.finished_reading()
285
def read_body_bytes(self, count=-1):
286
"""Read bytes from the body, decoding into a byte stream.
288
We read all bytes at once to ensure we've checked the trailer for
289
errors, and then feed the buffer back as read_body_bytes is called.
291
if self._body_buffer is not None:
292
return self._body_buffer.read(count)
293
_body_decoder = LengthPrefixedBodyDecoder()
295
while not _body_decoder.finished_reading:
296
bytes_wanted = _body_decoder.next_read_size()
297
bytes = self._request.read_bytes(bytes_wanted)
298
_body_decoder.accept_bytes(bytes)
299
self._request.finished_reading()
300
self._body_buffer = StringIO(_body_decoder.read_pending_data())
301
# XXX: TODO check the trailer result.
302
return self._body_buffer.read(count)
304
def _recv_tuple(self):
305
"""Receive a tuple from the medium request."""
307
while not line or line[-1] != '\n':
308
# TODO: this is inefficient - but tuples are short.
309
new_char = self._request.read_bytes(1)
311
assert new_char != '', "end of file reading from server."
312
return _decode_tuple(line)
314
def query_version(self):
315
"""Return protocol version number of the server."""
317
resp = self.read_response_tuple()
318
if resp == ('ok', '1'):
321
raise errors.SmartProtocolError("bad response %r" % (resp,))