1
# Copyright (C) 2006, 2007 Canonical Ltd
1
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
28
from bzrlib import debug
29
from bzrlib import errors
30
34
from bzrlib.smart import message, request
31
35
from bzrlib.trace import log_exception_quietly, mutter
32
36
from bzrlib.bencode import bdecode_as_tuple, bencode
114
118
class SmartServerRequestProtocolOne(SmartProtocolBase):
115
119
"""Server-side encoding and decoding logic for smart version 1."""
117
def __init__(self, backing_transport, write_func, root_client_path='/'):
121
def __init__(self, backing_transport, write_func, root_client_path='/',
118
123
self._backing_transport = backing_transport
119
124
self._root_client_path = root_client_path
125
self._jail_root = jail_root
120
126
self.unused_data = ''
121
127
self._finished = False
122
128
self.in_buffer = ''
144
150
req_args = _decode_tuple(first_line)
145
151
self.request = request.SmartServerRequestHandler(
146
152
self._backing_transport, commands=request.request_handlers,
147
root_client_path=self._root_client_path)
148
self.request.dispatch_command(req_args[0], req_args[1:])
153
root_client_path=self._root_client_path,
154
jail_root=self._jail_root)
155
self.request.args_received(req_args)
149
156
if self.request.finished_reading:
150
157
# trivial request
151
158
self.unused_data = self.in_buffer
612
619
mutter('hpss call: %s', repr(args)[1:-1])
613
620
if getattr(self._request._medium, 'base', None) is not None:
614
621
mutter(' (to %s)', self._request._medium.base)
615
self._request_start_time = time.time()
622
self._request_start_time = osutils.timer_func()
616
623
self._write_args(args)
617
624
self._request.finished_writing()
618
625
self._last_verb = args[0]
627
634
if getattr(self._request._medium, '_path', None) is not None:
628
635
mutter(' (to %s)', self._request._medium._path)
629
636
mutter(' %d bytes', len(body))
630
self._request_start_time = time.time()
637
self._request_start_time = osutils.timer_func()
631
638
if 'hpssdetail' in debug.debug_flags:
632
639
mutter('hpss body content: %s', body)
633
640
self._write_args(args)
646
653
mutter('hpss call w/readv: %s', repr(args)[1:-1])
647
654
if getattr(self._request._medium, '_path', None) is not None:
648
655
mutter(' (to %s)', self._request._medium._path)
649
self._request_start_time = time.time()
656
self._request_start_time = osutils.timer_func()
650
657
self._write_args(args)
651
658
readv_bytes = self._serialise_offsets(body)
652
659
bytes = self._encode_bulk_data(readv_bytes)
678
685
if 'hpss' in debug.debug_flags:
679
686
if self._request_start_time is not None:
680
687
mutter(' result: %6.3fs %s',
681
time.time() - self._request_start_time,
688
osutils.timer_func() - self._request_start_time,
682
689
repr(result)[1:-1])
683
690
self._request_start_time = None
860
867
def build_server_protocol_three(backing_transport, write_func,
868
root_client_path, jail_root=None):
862
869
request_handler = request.SmartServerRequestHandler(
863
870
backing_transport, commands=request.request_handlers,
864
root_client_path=root_client_path)
871
root_client_path=root_client_path, jail_root=jail_root)
865
872
responder = ProtocolThreeResponder(write_func)
866
873
message_handler = message.ConventionalRequestHandler(request_handler, responder)
867
874
return ProtocolThreeDecoder(message_handler)
1059
1066
class _ProtocolThreeEncoder(object):
1061
1068
response_marker = request_marker = MESSAGE_VERSION_THREE
1069
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1063
1071
def __init__(self, write_func):
1065
1074
self._real_write_func = write_func
1067
1076
def _write_func(self, bytes):
1077
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1078
# for total number of bytes to write, rather than buffer based on
1079
# the number of write() calls
1080
# TODO: Another possibility would be to turn this into an async model.
1081
# Where we let another thread know that we have some bytes if
1082
# they want it, but we don't actually block for it
1083
# Note that osutils.send_all always sends 64kB chunks anyway, so
1084
# we might just push out smaller bits at a time?
1068
1085
self._buf.append(bytes)
1069
if len(self._buf) > 100:
1086
self._buf_len += len(bytes)
1087
if self._buf_len > self.BUFFER_SIZE:
1072
1090
def flush(self):
1074
1092
self._real_write_func(''.join(self._buf))
1075
1093
del self._buf[:]
1077
1096
def _serialise_offsets(self, offsets):
1078
1097
"""Serialise a readv offset list."""
1127
1146
_ProtocolThreeEncoder.__init__(self, write_func)
1128
1147
self.response_sent = False
1129
1148
self._headers = {'Software version': bzrlib.__version__}
1149
if 'hpss' in debug.debug_flags:
1150
# python 2.6 introduced 'ident' as a nice small integer to
1151
# represent a thread. But it doesn't exist in 2.4/2.5
1152
cur_thread = threading.currentThread()
1153
self._thread_id = getattr(cur_thread, 'ident', None)
1154
if self._thread_id is None:
1155
self._thread_id = cur_thread.getName()
1156
self._response_start_time = None
1158
def _trace(self, action, message, extra_bytes=None, include_time=False):
1159
if self._response_start_time is None:
1160
self._response_start_time = osutils.timer_func()
1162
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1165
if extra_bytes is None:
1168
extra = ' ' + repr(extra_bytes[:40])
1170
extra = extra[:29] + extra[-1] + '...'
1171
mutter('%12s: [%s] %s%s%s'
1172
% (action, self._thread_id, t, message, extra))
1131
1174
def send_error(self, exception):
1132
1175
if self.response_sent:
1138
1181
('UnknownMethod', exception.verb))
1139
1182
self.send_response(failure)
1184
if 'hpss' in debug.debug_flags:
1185
self._trace('error', str(exception))
1141
1186
self.response_sent = True
1142
1187
self._write_protocol_version()
1143
1188
self._write_headers(self._headers)
1157
1202
self._write_success_status()
1159
1204
self._write_error_status()
1205
if 'hpss' in debug.debug_flags:
1206
self._trace('response', repr(response.args))
1160
1207
self._write_structure(response.args)
1161
1208
if response.body is not None:
1162
1209
self._write_prefixed_body(response.body)
1210
if 'hpss' in debug.debug_flags:
1211
self._trace('body', '%d bytes' % (len(response.body),),
1212
response.body, include_time=True)
1163
1213
elif response.body_stream is not None:
1214
count = num_bytes = 0
1164
1216
for exc_info, chunk in _iter_with_errors(response.body_stream):
1165
1218
if exc_info is not None:
1166
1219
self._write_error_status()
1167
1220
error_struct = request._translate_error(exc_info[1])
1172
1225
self._write_error_status()
1173
1226
self._write_structure(chunk.args)
1228
num_bytes += len(chunk)
1229
if first_chunk is None:
1175
1231
self._write_prefixed_body(chunk)
1232
if 'hpssdetail' in debug.debug_flags:
1233
# Not worth timing separately, as _write_func is
1235
self._trace('body chunk',
1236
'%d bytes' % (len(chunk),),
1237
chunk, suppress_time=True)
1238
if 'hpss' in debug.debug_flags:
1239
self._trace('body stream',
1240
'%d bytes %d chunks' % (num_bytes, count),
1176
1242
self._write_end()
1243
if 'hpss' in debug.debug_flags:
1244
self._trace('response end', '', include_time=True)
1179
1247
def _iter_with_errors(iterable):
1231
1299
base = getattr(self._medium_request._medium, 'base', None)
1232
1300
if base is not None:
1233
1301
mutter(' (to %s)', base)
1234
self._request_start_time = time.time()
1302
self._request_start_time = osutils.timer_func()
1235
1303
self._write_protocol_version()
1236
1304
self._write_headers(self._headers)
1237
1305
self._write_structure(args)
1249
1317
if path is not None:
1250
1318
mutter(' (to %s)', path)
1251
1319
mutter(' %d bytes', len(body))
1252
self._request_start_time = time.time()
1320
self._request_start_time = osutils.timer_func()
1253
1321
self._write_protocol_version()
1254
1322
self._write_headers(self._headers)
1255
1323
self._write_structure(args)
1268
1336
path = getattr(self._medium_request._medium, '_path', None)
1269
1337
if path is not None:
1270
1338
mutter(' (to %s)', path)
1271
self._request_start_time = time.time()
1339
self._request_start_time = osutils.timer_func()
1272
1340
self._write_protocol_version()
1273
1341
self._write_headers(self._headers)
1274
1342
self._write_structure(args)
1285
1353
path = getattr(self._medium_request._medium, '_path', None)
1286
1354
if path is not None:
1287
1355
mutter(' (to %s)', path)
1288
self._request_start_time = time.time()
1356
self._request_start_time = osutils.timer_func()
1289
1357
self._write_protocol_version()
1290
1358
self._write_headers(self._headers)
1291
1359
self._write_structure(args)