/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-21 17:24:22 UTC
  • mfrom: (4913 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4916.
  • Revision ID: john@arbash-meinel.com-20091221172422-0zy2v8x3fhcdc8c0
Merge bzr.dev 4913, to put NEWS in its place.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
23
23
import struct
24
24
import sys
 
25
import threading
25
26
import time
26
27
 
27
28
import bzrlib
28
 
from bzrlib import debug
29
 
from bzrlib import errors
 
29
from bzrlib import (
 
30
    debug,
 
31
    errors,
 
32
    osutils,
 
33
    )
30
34
from bzrlib.smart import message, request
31
35
from bzrlib.trace import log_exception_quietly, mutter
32
36
from bzrlib.bencode import bdecode_as_tuple, bencode
615
619
            mutter('hpss call:   %s', repr(args)[1:-1])
616
620
            if getattr(self._request._medium, 'base', None) is not None:
617
621
                mutter('             (to %s)', self._request._medium.base)
618
 
            self._request_start_time = time.time()
 
622
            self._request_start_time = osutils.timer_func()
619
623
        self._write_args(args)
620
624
        self._request.finished_writing()
621
625
        self._last_verb = args[0]
630
634
            if getattr(self._request._medium, '_path', None) is not None:
631
635
                mutter('                  (to %s)', self._request._medium._path)
632
636
            mutter('              %d bytes', len(body))
633
 
            self._request_start_time = time.time()
 
637
            self._request_start_time = osutils.timer_func()
634
638
            if 'hpssdetail' in debug.debug_flags:
635
639
                mutter('hpss body content: %s', body)
636
640
        self._write_args(args)
649
653
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
650
654
            if getattr(self._request._medium, '_path', None) is not None:
651
655
                mutter('                  (to %s)', self._request._medium._path)
652
 
            self._request_start_time = time.time()
 
656
            self._request_start_time = osutils.timer_func()
653
657
        self._write_args(args)
654
658
        readv_bytes = self._serialise_offsets(body)
655
659
        bytes = self._encode_bulk_data(readv_bytes)
681
685
        if 'hpss' in debug.debug_flags:
682
686
            if self._request_start_time is not None:
683
687
                mutter('   result:   %6.3fs  %s',
684
 
                       time.time() - self._request_start_time,
 
688
                       osutils.timer_func() - self._request_start_time,
685
689
                       repr(result)[1:-1])
686
690
                self._request_start_time = None
687
691
            else:
1062
1066
class _ProtocolThreeEncoder(object):
1063
1067
 
1064
1068
    response_marker = request_marker = MESSAGE_VERSION_THREE
 
1069
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1065
1070
 
1066
1071
    def __init__(self, write_func):
1067
1072
        self._buf = []
 
1073
        self._buf_len = 0
1068
1074
        self._real_write_func = write_func
1069
1075
 
1070
1076
    def _write_func(self, bytes):
 
1077
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1078
        #       for total number of bytes to write, rather than buffer based on
 
1079
        #       the number of write() calls
 
1080
        # TODO: Another possibility would be to turn this into an async model.
 
1081
        #       Where we let another thread know that we have some bytes if
 
1082
        #       they want it, but we don't actually block for it
 
1083
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
 
1084
        #       we might just push out smaller bits at a time?
1071
1085
        self._buf.append(bytes)
1072
 
        if len(self._buf) > 100:
 
1086
        self._buf_len += len(bytes)
 
1087
        if self._buf_len > self.BUFFER_SIZE:
1073
1088
            self.flush()
1074
1089
 
1075
1090
    def flush(self):
1076
1091
        if self._buf:
1077
1092
            self._real_write_func(''.join(self._buf))
1078
1093
            del self._buf[:]
 
1094
            self._buf_len = 0
1079
1095
 
1080
1096
    def _serialise_offsets(self, offsets):
1081
1097
        """Serialise a readv offset list."""
1130
1146
        _ProtocolThreeEncoder.__init__(self, write_func)
1131
1147
        self.response_sent = False
1132
1148
        self._headers = {'Software version': bzrlib.__version__}
 
1149
        if 'hpss' in debug.debug_flags:
 
1150
            # python 2.6 introduced 'ident' as a nice small integer to
 
1151
            # represent a thread. But it doesn't exist in 2.4/2.5
 
1152
            cur_thread = threading.currentThread()
 
1153
            self._thread_id = getattr(cur_thread, 'ident', None)
 
1154
            if self._thread_id is None:
 
1155
                self._thread_id = cur_thread.getName()
 
1156
            self._response_start_time = None
 
1157
 
 
1158
    def _trace(self, action, message, extra_bytes=None, include_time=False):
 
1159
        if self._response_start_time is None:
 
1160
            self._response_start_time = osutils.timer_func()
 
1161
        if include_time:
 
1162
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
 
1163
        else:
 
1164
            t = ''
 
1165
        if extra_bytes is None:
 
1166
            extra = ''
 
1167
        else:
 
1168
            extra = ' ' + repr(extra_bytes[:40])
 
1169
            if len(extra) > 33:
 
1170
                extra = extra[:29] + extra[-1] + '...'
 
1171
        mutter('%12s: [%s] %s%s%s'
 
1172
               % (action, self._thread_id, t, message, extra))
1133
1173
 
1134
1174
    def send_error(self, exception):
1135
1175
        if self.response_sent:
1141
1181
                ('UnknownMethod', exception.verb))
1142
1182
            self.send_response(failure)
1143
1183
            return
 
1184
        if 'hpss' in debug.debug_flags:
 
1185
            self._trace('error', str(exception))
1144
1186
        self.response_sent = True
1145
1187
        self._write_protocol_version()
1146
1188
        self._write_headers(self._headers)
1160
1202
            self._write_success_status()
1161
1203
        else:
1162
1204
            self._write_error_status()
 
1205
        if 'hpss' in debug.debug_flags:
 
1206
            self._trace('response', repr(response.args))
1163
1207
        self._write_structure(response.args)
1164
1208
        if response.body is not None:
1165
1209
            self._write_prefixed_body(response.body)
 
1210
            if 'hpss' in debug.debug_flags:
 
1211
                self._trace('body', '%d bytes' % (len(response.body),),
 
1212
                            response.body, include_time=True)
1166
1213
        elif response.body_stream is not None:
 
1214
            count = num_bytes = 0
 
1215
            first_chunk = None
1167
1216
            for exc_info, chunk in _iter_with_errors(response.body_stream):
 
1217
                count += 1
1168
1218
                if exc_info is not None:
1169
1219
                    self._write_error_status()
1170
1220
                    error_struct = request._translate_error(exc_info[1])
1175
1225
                        self._write_error_status()
1176
1226
                        self._write_structure(chunk.args)
1177
1227
                        break
 
1228
                    num_bytes += len(chunk)
 
1229
                    if first_chunk is None:
 
1230
                        first_chunk = chunk
1178
1231
                    self._write_prefixed_body(chunk)
 
1232
                    if 'hpssdetail' in debug.debug_flags:
 
1233
                        # Not worth timing separately, as _write_func is
 
1234
                        # actually buffered
 
1235
                        self._trace('body chunk',
 
1236
                                    '%d bytes' % (len(chunk),),
 
1237
                                    chunk, suppress_time=True)
 
1238
            if 'hpss' in debug.debug_flags:
 
1239
                self._trace('body stream',
 
1240
                            '%d bytes %d chunks' % (num_bytes, count),
 
1241
                            first_chunk)
1179
1242
        self._write_end()
 
1243
        if 'hpss' in debug.debug_flags:
 
1244
            self._trace('response end', '', include_time=True)
1180
1245
 
1181
1246
 
1182
1247
def _iter_with_errors(iterable):
1234
1299
            base = getattr(self._medium_request._medium, 'base', None)
1235
1300
            if base is not None:
1236
1301
                mutter('             (to %s)', base)
1237
 
            self._request_start_time = time.time()
 
1302
            self._request_start_time = osutils.timer_func()
1238
1303
        self._write_protocol_version()
1239
1304
        self._write_headers(self._headers)
1240
1305
        self._write_structure(args)
1252
1317
            if path is not None:
1253
1318
                mutter('                  (to %s)', path)
1254
1319
            mutter('              %d bytes', len(body))
1255
 
            self._request_start_time = time.time()
 
1320
            self._request_start_time = osutils.timer_func()
1256
1321
        self._write_protocol_version()
1257
1322
        self._write_headers(self._headers)
1258
1323
        self._write_structure(args)
1271
1336
            path = getattr(self._medium_request._medium, '_path', None)
1272
1337
            if path is not None:
1273
1338
                mutter('                  (to %s)', path)
1274
 
            self._request_start_time = time.time()
 
1339
            self._request_start_time = osutils.timer_func()
1275
1340
        self._write_protocol_version()
1276
1341
        self._write_headers(self._headers)
1277
1342
        self._write_structure(args)
1288
1353
            path = getattr(self._medium_request._medium, '_path', None)
1289
1354
            if path is not None:
1290
1355
                mutter('                  (to %s)', path)
1291
 
            self._request_start_time = time.time()
 
1356
            self._request_start_time = osutils.timer_func()
1292
1357
        self._write_protocol_version()
1293
1358
        self._write_headers(self._headers)
1294
1359
        self._write_structure(args)