1
# Copyright (C) 2006, 2007 Canonical Ltd
1
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
28
from bzrlib import debug
29
from bzrlib import errors
30
34
from bzrlib.smart import message, request
31
35
from bzrlib.trace import log_exception_quietly, mutter
32
36
from bzrlib.bencode import bdecode_as_tuple, bencode
615
619
mutter('hpss call: %s', repr(args)[1:-1])
616
620
if getattr(self._request._medium, 'base', None) is not None:
617
621
mutter(' (to %s)', self._request._medium.base)
618
self._request_start_time = time.time()
622
self._request_start_time = osutils.timer_func()
619
623
self._write_args(args)
620
624
self._request.finished_writing()
621
625
self._last_verb = args[0]
630
634
if getattr(self._request._medium, '_path', None) is not None:
631
635
mutter(' (to %s)', self._request._medium._path)
632
636
mutter(' %d bytes', len(body))
633
self._request_start_time = time.time()
637
self._request_start_time = osutils.timer_func()
634
638
if 'hpssdetail' in debug.debug_flags:
635
639
mutter('hpss body content: %s', body)
636
640
self._write_args(args)
649
653
mutter('hpss call w/readv: %s', repr(args)[1:-1])
650
654
if getattr(self._request._medium, '_path', None) is not None:
651
655
mutter(' (to %s)', self._request._medium._path)
652
self._request_start_time = time.time()
656
self._request_start_time = osutils.timer_func()
653
657
self._write_args(args)
654
658
readv_bytes = self._serialise_offsets(body)
655
659
bytes = self._encode_bulk_data(readv_bytes)
681
685
if 'hpss' in debug.debug_flags:
682
686
if self._request_start_time is not None:
683
687
mutter(' result: %6.3fs %s',
684
time.time() - self._request_start_time,
688
osutils.timer_func() - self._request_start_time,
685
689
repr(result)[1:-1])
686
690
self._request_start_time = None
1062
1066
class _ProtocolThreeEncoder(object):
1064
1068
response_marker = request_marker = MESSAGE_VERSION_THREE
1069
BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1066
1071
def __init__(self, write_func):
1068
1074
self._real_write_func = write_func
1070
1076
def _write_func(self, bytes):
1077
# TODO: It is probably more appropriate to use sum(map(len, _buf))
1078
# for total number of bytes to write, rather than buffer based on
1079
# the number of write() calls
1080
# TODO: Another possibility would be to turn this into an async model.
1081
# Where we let another thread know that we have some bytes if
1082
# they want it, but we don't actually block for it
1083
# Note that osutils.send_all always sends 64kB chunks anyway, so
1084
# we might just push out smaller bits at a time?
1071
1085
self._buf.append(bytes)
1072
if len(self._buf) > 100:
1086
self._buf_len += len(bytes)
1087
if self._buf_len > self.BUFFER_SIZE:
1075
1090
def flush(self):
1077
1092
self._real_write_func(''.join(self._buf))
1078
1093
del self._buf[:]
1080
1096
def _serialise_offsets(self, offsets):
1081
1097
"""Serialise a readv offset list."""
1130
1146
_ProtocolThreeEncoder.__init__(self, write_func)
1131
1147
self.response_sent = False
1132
1148
self._headers = {'Software version': bzrlib.__version__}
1149
if 'hpss' in debug.debug_flags:
1150
# python 2.6 introduced 'ident' as a nice small integer to
1151
# represent a thread. But it doesn't exist in 2.4/2.5
1152
cur_thread = threading.currentThread()
1153
self._thread_id = getattr(cur_thread, 'ident', None)
1154
if self._thread_id is None:
1155
self._thread_id = cur_thread.getName()
1156
self._response_start_time = None
1158
def _trace(self, action, message, extra_bytes=None, include_time=False):
1159
if self._response_start_time is None:
1160
self._response_start_time = osutils.timer_func()
1162
t = '%5.3fs ' % (time.clock() - self._response_start_time)
1165
if extra_bytes is None:
1168
extra = ' ' + repr(extra_bytes[:40])
1170
extra = extra[:29] + extra[-1] + '...'
1171
mutter('%12s: [%s] %s%s%s'
1172
% (action, self._thread_id, t, message, extra))
1134
1174
def send_error(self, exception):
1135
1175
if self.response_sent:
1141
1181
('UnknownMethod', exception.verb))
1142
1182
self.send_response(failure)
1184
if 'hpss' in debug.debug_flags:
1185
self._trace('error', str(exception))
1144
1186
self.response_sent = True
1145
1187
self._write_protocol_version()
1146
1188
self._write_headers(self._headers)
1160
1202
self._write_success_status()
1162
1204
self._write_error_status()
1205
if 'hpss' in debug.debug_flags:
1206
self._trace('response', repr(response.args))
1163
1207
self._write_structure(response.args)
1164
1208
if response.body is not None:
1165
1209
self._write_prefixed_body(response.body)
1210
if 'hpss' in debug.debug_flags:
1211
self._trace('body', '%d bytes' % (len(response.body),),
1212
response.body, include_time=True)
1166
1213
elif response.body_stream is not None:
1214
count = num_bytes = 0
1167
1216
for exc_info, chunk in _iter_with_errors(response.body_stream):
1168
1218
if exc_info is not None:
1169
1219
self._write_error_status()
1170
1220
error_struct = request._translate_error(exc_info[1])
1175
1225
self._write_error_status()
1176
1226
self._write_structure(chunk.args)
1228
num_bytes += len(chunk)
1229
if first_chunk is None:
1178
1231
self._write_prefixed_body(chunk)
1232
if 'hpssdetail' in debug.debug_flags:
1233
# Not worth timing separately, as _write_func is
1235
self._trace('body chunk',
1236
'%d bytes' % (len(chunk),),
1237
chunk, suppress_time=True)
1238
if 'hpss' in debug.debug_flags:
1239
self._trace('body stream',
1240
'%d bytes %d chunks' % (num_bytes, count),
1179
1242
self._write_end()
1243
if 'hpss' in debug.debug_flags:
1244
self._trace('response end', '', include_time=True)
1182
1247
def _iter_with_errors(iterable):
1234
1299
base = getattr(self._medium_request._medium, 'base', None)
1235
1300
if base is not None:
1236
1301
mutter(' (to %s)', base)
1237
self._request_start_time = time.time()
1302
self._request_start_time = osutils.timer_func()
1238
1303
self._write_protocol_version()
1239
1304
self._write_headers(self._headers)
1240
1305
self._write_structure(args)
1252
1317
if path is not None:
1253
1318
mutter(' (to %s)', path)
1254
1319
mutter(' %d bytes', len(body))
1255
self._request_start_time = time.time()
1320
self._request_start_time = osutils.timer_func()
1256
1321
self._write_protocol_version()
1257
1322
self._write_headers(self._headers)
1258
1323
self._write_structure(args)
1271
1336
path = getattr(self._medium_request._medium, '_path', None)
1272
1337
if path is not None:
1273
1338
mutter(' (to %s)', path)
1274
self._request_start_time = time.time()
1339
self._request_start_time = osutils.timer_func()
1275
1340
self._write_protocol_version()
1276
1341
self._write_headers(self._headers)
1277
1342
self._write_structure(args)
1288
1353
path = getattr(self._medium_request._medium, '_path', None)
1289
1354
if path is not None:
1290
1355
mutter(' (to %s)', path)
1291
self._request_start_time = time.time()
1356
self._request_start_time = osutils.timer_func()
1292
1357
self._write_protocol_version()
1293
1358
self._write_headers(self._headers)
1294
1359
self._write_structure(args)