/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/protocol.py

  • Committer: Robert Collins
  • Date: 2010-05-06 11:08:10 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506110810-h3j07fh5gmw54s25
Cleaner matcher matching revised unlocking protocol.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
from cStringIO import StringIO
23
23
import struct
24
24
import sys
 
25
import thread
 
26
import threading
25
27
import time
26
28
 
27
29
import bzrlib
28
 
from bzrlib import debug
29
 
from bzrlib import errors
 
30
from bzrlib import (
 
31
    debug,
 
32
    errors,
 
33
    osutils,
 
34
    )
30
35
from bzrlib.smart import message, request
31
36
from bzrlib.trace import log_exception_quietly, mutter
32
37
from bzrlib.bencode import bdecode_as_tuple, bencode
57
62
 
58
63
def _encode_tuple(args):
59
64
    """Encode the tuple args to a bytestream."""
60
 
    return '\x01'.join(args) + '\n'
 
65
    joined = '\x01'.join(args) + '\n'
 
66
    if type(joined) is unicode:
 
67
        # XXX: We should fix things so this never happens!  -AJB, 20100304
 
68
        mutter('response args contain unicode, should be only bytes: %r',
 
69
               joined)
 
70
        joined = joined.encode('ascii')
 
71
    return joined
61
72
 
62
73
 
63
74
class Requester(object):
615
626
            mutter('hpss call:   %s', repr(args)[1:-1])
616
627
            if getattr(self._request._medium, 'base', None) is not None:
617
628
                mutter('             (to %s)', self._request._medium.base)
618
 
            self._request_start_time = time.time()
 
629
            self._request_start_time = osutils.timer_func()
619
630
        self._write_args(args)
620
631
        self._request.finished_writing()
621
632
        self._last_verb = args[0]
630
641
            if getattr(self._request._medium, '_path', None) is not None:
631
642
                mutter('                  (to %s)', self._request._medium._path)
632
643
            mutter('              %d bytes', len(body))
633
 
            self._request_start_time = time.time()
 
644
            self._request_start_time = osutils.timer_func()
634
645
            if 'hpssdetail' in debug.debug_flags:
635
646
                mutter('hpss body content: %s', body)
636
647
        self._write_args(args)
649
660
            mutter('hpss call w/readv: %s', repr(args)[1:-1])
650
661
            if getattr(self._request._medium, '_path', None) is not None:
651
662
                mutter('                  (to %s)', self._request._medium._path)
652
 
            self._request_start_time = time.time()
 
663
            self._request_start_time = osutils.timer_func()
653
664
        self._write_args(args)
654
665
        readv_bytes = self._serialise_offsets(body)
655
666
        bytes = self._encode_bulk_data(readv_bytes)
681
692
        if 'hpss' in debug.debug_flags:
682
693
            if self._request_start_time is not None:
683
694
                mutter('   result:   %6.3fs  %s',
684
 
                       time.time() - self._request_start_time,
 
695
                       osutils.timer_func() - self._request_start_time,
685
696
                       repr(result)[1:-1])
686
697
                self._request_start_time = None
687
698
            else:
1062
1073
class _ProtocolThreeEncoder(object):
1063
1074
 
1064
1075
    response_marker = request_marker = MESSAGE_VERSION_THREE
 
1076
    BUFFER_SIZE = 1024*1024 # 1 MiB buffer before flushing
1065
1077
 
1066
1078
    def __init__(self, write_func):
1067
1079
        self._buf = []
 
1080
        self._buf_len = 0
1068
1081
        self._real_write_func = write_func
1069
1082
 
1070
1083
    def _write_func(self, bytes):
 
1084
        # TODO: It is probably more appropriate to use sum(map(len, _buf))
 
1085
        #       for total number of bytes to write, rather than buffer based on
 
1086
        #       the number of write() calls
 
1087
        # TODO: Another possibility would be to turn this into an async model.
 
1088
        #       Where we let another thread know that we have some bytes if
 
1089
        #       they want it, but we don't actually block for it
 
1090
        #       Note that osutils.send_all always sends 64kB chunks anyway, so
 
1091
        #       we might just push out smaller bits at a time?
1071
1092
        self._buf.append(bytes)
1072
 
        if len(self._buf) > 100:
 
1093
        self._buf_len += len(bytes)
 
1094
        if self._buf_len > self.BUFFER_SIZE:
1073
1095
            self.flush()
1074
1096
 
1075
1097
    def flush(self):
1076
1098
        if self._buf:
1077
1099
            self._real_write_func(''.join(self._buf))
1078
1100
            del self._buf[:]
 
1101
            self._buf_len = 0
1079
1102
 
1080
1103
    def _serialise_offsets(self, offsets):
1081
1104
        """Serialise a readv offset list."""
1130
1153
        _ProtocolThreeEncoder.__init__(self, write_func)
1131
1154
        self.response_sent = False
1132
1155
        self._headers = {'Software version': bzrlib.__version__}
 
1156
        if 'hpss' in debug.debug_flags:
 
1157
            self._thread_id = thread.get_ident()
 
1158
            self._response_start_time = None
 
1159
 
 
1160
    def _trace(self, action, message, extra_bytes=None, include_time=False):
 
1161
        if self._response_start_time is None:
 
1162
            self._response_start_time = osutils.timer_func()
 
1163
        if include_time:
 
1164
            t = '%5.3fs ' % (time.clock() - self._response_start_time)
 
1165
        else:
 
1166
            t = ''
 
1167
        if extra_bytes is None:
 
1168
            extra = ''
 
1169
        else:
 
1170
            extra = ' ' + repr(extra_bytes[:40])
 
1171
            if len(extra) > 33:
 
1172
                extra = extra[:29] + extra[-1] + '...'
 
1173
        mutter('%12s: [%s] %s%s%s'
 
1174
               % (action, self._thread_id, t, message, extra))
1133
1175
 
1134
1176
    def send_error(self, exception):
1135
1177
        if self.response_sent:
1141
1183
                ('UnknownMethod', exception.verb))
1142
1184
            self.send_response(failure)
1143
1185
            return
 
1186
        if 'hpss' in debug.debug_flags:
 
1187
            self._trace('error', str(exception))
1144
1188
        self.response_sent = True
1145
1189
        self._write_protocol_version()
1146
1190
        self._write_headers(self._headers)
1160
1204
            self._write_success_status()
1161
1205
        else:
1162
1206
            self._write_error_status()
 
1207
        if 'hpss' in debug.debug_flags:
 
1208
            self._trace('response', repr(response.args))
1163
1209
        self._write_structure(response.args)
1164
1210
        if response.body is not None:
1165
1211
            self._write_prefixed_body(response.body)
 
1212
            if 'hpss' in debug.debug_flags:
 
1213
                self._trace('body', '%d bytes' % (len(response.body),),
 
1214
                            response.body, include_time=True)
1166
1215
        elif response.body_stream is not None:
 
1216
            count = num_bytes = 0
 
1217
            first_chunk = None
1167
1218
            for exc_info, chunk in _iter_with_errors(response.body_stream):
 
1219
                count += 1
1168
1220
                if exc_info is not None:
1169
1221
                    self._write_error_status()
1170
1222
                    error_struct = request._translate_error(exc_info[1])
1175
1227
                        self._write_error_status()
1176
1228
                        self._write_structure(chunk.args)
1177
1229
                        break
 
1230
                    num_bytes += len(chunk)
 
1231
                    if first_chunk is None:
 
1232
                        first_chunk = chunk
1178
1233
                    self._write_prefixed_body(chunk)
 
1234
                    if 'hpssdetail' in debug.debug_flags:
 
1235
                        # Not worth timing separately, as _write_func is
 
1236
                        # actually buffered
 
1237
                        self._trace('body chunk',
 
1238
                                    '%d bytes' % (len(chunk),),
 
1239
                                    chunk, suppress_time=True)
 
1240
            if 'hpss' in debug.debug_flags:
 
1241
                self._trace('body stream',
 
1242
                            '%d bytes %d chunks' % (num_bytes, count),
 
1243
                            first_chunk)
1179
1244
        self._write_end()
 
1245
        if 'hpss' in debug.debug_flags:
 
1246
            self._trace('response end', '', include_time=True)
1180
1247
 
1181
1248
 
1182
1249
def _iter_with_errors(iterable):
1234
1301
            base = getattr(self._medium_request._medium, 'base', None)
1235
1302
            if base is not None:
1236
1303
                mutter('             (to %s)', base)
1237
 
            self._request_start_time = time.time()
 
1304
            self._request_start_time = osutils.timer_func()
1238
1305
        self._write_protocol_version()
1239
1306
        self._write_headers(self._headers)
1240
1307
        self._write_structure(args)
1252
1319
            if path is not None:
1253
1320
                mutter('                  (to %s)', path)
1254
1321
            mutter('              %d bytes', len(body))
1255
 
            self._request_start_time = time.time()
 
1322
            self._request_start_time = osutils.timer_func()
1256
1323
        self._write_protocol_version()
1257
1324
        self._write_headers(self._headers)
1258
1325
        self._write_structure(args)
1271
1338
            path = getattr(self._medium_request._medium, '_path', None)
1272
1339
            if path is not None:
1273
1340
                mutter('                  (to %s)', path)
1274
 
            self._request_start_time = time.time()
 
1341
            self._request_start_time = osutils.timer_func()
1275
1342
        self._write_protocol_version()
1276
1343
        self._write_headers(self._headers)
1277
1344
        self._write_structure(args)
1288
1355
            path = getattr(self._medium_request._medium, '_path', None)
1289
1356
            if path is not None:
1290
1357
                mutter('                  (to %s)', path)
1291
 
            self._request_start_time = time.time()
 
1358
            self._request_start_time = osutils.timer_func()
1292
1359
        self._write_protocol_version()
1293
1360
        self._write_headers(self._headers)
1294
1361
        self._write_structure(args)