/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

separate out the client medium from the client encoding protocol for the smart server.

Show diffs side-by-side

added added

removed removed

Lines of Context:
249
249
class SmartProtocolBase(object):
250
250
    """Methods common to client and server"""
251
251
 
252
 
    def _send_bulk_data(self, body, a_file=None):
253
 
        """Send chunked body data"""
254
 
        assert isinstance(body, str)
255
 
        bytes = self._encode_bulk_data(body)
256
 
        self._write_and_flush(bytes, a_file)
257
 
 
258
 
    def _encode_bulk_data(self, body):
259
 
        """Encode body as a bulk data chunk."""
260
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
261
 
 
262
252
    # TODO: this only actually accomodates a single block; possibly should support
263
253
    # multiple chunks?
264
254
    def _recv_bulk(self):
 
255
        # This is OBSOLETE except for the double handline in the server: 
 
256
        # the read_bulk + reencode noise.
265
257
        chunk_len = self._in.readline()
266
258
        try:
267
259
            chunk_len = int(chunk_len)
273
265
        self._recv_trailer()
274
266
        return bulk
275
267
 
276
 
    def _recv_tuple(self):
277
 
        return _recv_tuple(self._in)
278
 
 
279
268
    def _recv_trailer(self):
280
269
        resp = self._recv_tuple()
281
270
        if resp == ('done', ):
283
272
        else:
284
273
            self._translate_error(resp)
285
274
 
 
275
    def _encode_bulk_data(self, body):
 
276
        """Encode body as a bulk data chunk."""
 
277
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
278
 
286
279
    def _serialise_offsets(self, offsets):
287
280
        """Serialise a readv offset list."""
288
281
        txt = []
290
283
            txt.append('%d,%d' % (start, length))
291
284
        return '\n'.join(txt)
292
285
 
 
286
    def _send_bulk_data(self, body, a_file=None):
 
287
        """Send chunked body data"""
 
288
        assert isinstance(body, str)
 
289
        bytes = self._encode_bulk_data(body)
 
290
        self._write_and_flush(bytes, a_file)
 
291
 
293
292
    def _write_and_flush(self, bytes, a_file=None):
294
293
        """Write bytes to self._out and flush it."""
295
294
        # XXX: this will be inefficient.  Just ask Robert.
297
296
            a_file = self._out
298
297
        a_file.write(bytes)
299
298
        a_file.flush()
300
 
 
 
299
        
301
300
 
302
301
class SmartServerRequestProtocolOne(SmartProtocolBase):
303
302
    """Server-side encoding and decoding logic for smart version 1."""
380
379
    def sync_with_request(self, request):
381
380
        self.finished_reading = request.finished_reading
382
381
        
383
 
        
 
382
 
384
383
class LengthPrefixedBodyDecoder(object):
385
384
    """Decodes the length-prefixed bulk data."""
386
385
    
450
449
        result = self._in_buffer
451
450
        self._in_buffer = ''
452
451
        return result
453
 
            
454
 
        
455
452
 
456
 
        
457
453
 
458
454
class SmartServerStreamMedium(SmartProtocolBase):
459
455
    """Handles smart commands coming over a stream.
932
928
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
933
929
                transport.split_url(url)
934
930
        if clone_from is None:
935
 
            self._client = medium
 
931
            self._medium = medium
936
932
        else:
937
933
            # credentials may be stripped from the base in some circumstances
938
934
            # as yet to be clearly defined or documented, so copy them.
939
935
            self._username = clone_from._username
940
936
            # reuse same connection
941
 
            self._client = clone_from._client
942
 
        assert self._client is not None
 
937
            self._medium = clone_from._medium
 
938
        assert self._medium is not None
943
939
 
944
940
    def abspath(self, relpath):
945
941
        """Return the full url to the given relative path.
964
960
        return False
965
961
                                                   
966
962
    def get_smart_client(self):
967
 
        return self._client
 
963
        return self._medium
968
964
 
969
965
    def get_smart_medium(self):
970
 
        return self._client
 
966
        return self._medium
971
967
                                                   
972
968
    def _unparse_url(self, path):
973
969
        """Return URL for a path.
990
986
        """Returns the Unicode version of the absolute path for relpath."""
991
987
        return self._combine_paths(self._path, relpath)
992
988
 
 
989
    def _call(self, method, *args):
 
990
        resp = self._call2(method, *args)
 
991
        self._translate_error(resp)
 
992
 
 
993
    def _call2(self, method, *args):
 
994
        """Call a method on the remote server."""
 
995
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
996
        protocol.call(method, *args)
 
997
        return protocol.read_response_tuple()
 
998
 
 
999
    def _call_with_body_bytes(self, method, args, body):
 
1000
        """Call a method on the remote server with body bytes."""
 
1001
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1002
        protocol.call_with_body_bytes((method, ) + args, body)
 
1003
        return protocol.read_response_tuple()
 
1004
 
993
1005
    def has(self, relpath):
994
1006
        """Indicate whether a remote file of the given name exists or not.
995
1007
 
996
1008
        :see: Transport.has()
997
1009
        """
998
 
        resp = self._client._call('has', self._remote_path(relpath))
 
1010
        resp = self._call2('has', self._remote_path(relpath))
999
1011
        if resp == ('yes', ):
1000
1012
            return True
1001
1013
        elif resp == ('no', ):
1008
1020
        
1009
1021
        :see: Transport.get_bytes()/get_file()
1010
1022
        """
 
1023
        return StringIO(self.get_bytes(relpath))
 
1024
 
 
1025
    def get_bytes(self, relpath):
1011
1026
        remote = self._remote_path(relpath)
1012
 
        resp = self._client._call('get', remote)
 
1027
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1028
        protocol.call('get', remote)
 
1029
        resp = protocol.read_response_tuple(True)
1013
1030
        if resp != ('ok', ):
 
1031
            protocol.cancel_read_body()
1014
1032
            self._translate_error(resp, relpath)
1015
 
        return StringIO(self._client._recv_bulk())
 
1033
        return protocol.read_body_bytes()
1016
1034
 
1017
1035
    def _serialise_optional_mode(self, mode):
1018
1036
        if mode is None:
1021
1039
            return '%d' % mode
1022
1040
 
1023
1041
    def mkdir(self, relpath, mode=None):
1024
 
        resp = self._client._call('mkdir', 
1025
 
                                  self._remote_path(relpath), 
1026
 
                                  self._serialise_optional_mode(mode))
 
1042
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1043
            self._serialise_optional_mode(mode))
1027
1044
        self._translate_error(resp)
1028
1045
 
1029
1046
    def put_bytes(self, relpath, upload_contents, mode=None):
1030
1047
        # FIXME: upload_file is probably not safe for non-ascii characters -
1031
1048
        # should probably just pass all parameters as length-delimited
1032
1049
        # strings?
1033
 
        resp = self._client._call_with_upload(
1034
 
            'put',
 
1050
        resp = self._call_with_body_bytes('put',
1035
1051
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
1036
1052
            upload_contents)
1037
1053
        self._translate_error(resp)
1045
1061
        if create_parent_dir:
1046
1062
            create_parent_str = 'T'
1047
1063
 
1048
 
        resp = self._client._call_with_upload(
 
1064
        resp = self._call_with_body_bytes(
1049
1065
            'put_non_atomic',
1050
1066
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
1051
1067
             create_parent_str, self._serialise_optional_mode(dir_mode)),
1074
1090
        return self.append_bytes(relpath, from_file.read(), mode)
1075
1091
        
1076
1092
    def append_bytes(self, relpath, bytes, mode=None):
1077
 
        resp = self._client._call_with_upload(
 
1093
        resp = self._call_with_body_bytes(
1078
1094
            'append',
1079
1095
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
1080
1096
            bytes)
1083
1099
        self._translate_error(resp)
1084
1100
 
1085
1101
    def delete(self, relpath):
1086
 
        resp = self._client._call('delete', self._remote_path(relpath))
 
1102
        resp = self._call2('delete', self._remote_path(relpath))
1087
1103
        self._translate_error(resp)
1088
1104
 
1089
1105
    def readv(self, relpath, offsets):
1100
1116
                               limit=self._max_readv_combine,
1101
1117
                               fudge_factor=self._bytes_to_read_before_seek))
1102
1118
 
1103
 
 
1104
 
        resp = self._client._call_with_upload(
1105
 
            'readv',
1106
 
            (self._remote_path(relpath),),
1107
 
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
1119
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1120
        protocol.call_with_body_readv_array(
 
1121
            ('readv', self._remote_path(relpath)),
 
1122
            [(c.start, c.length) for c in coalesced])
 
1123
        resp = protocol.read_response_tuple(True)
1108
1124
 
1109
1125
        if resp[0] != 'readv':
1110
1126
            # This should raise an exception
 
1127
            protocol.cancel_read_body()
1111
1128
            self._translate_error(resp)
1112
1129
            return
1113
1130
 
1114
 
        data = self._client._recv_bulk()
 
1131
        # FIXME: this should know how many bytes are needed, for clarity.
 
1132
        data = protocol.read_body_bytes()
1115
1133
        # Cache the results, but only until they have been fulfilled
1116
1134
        data_map = {}
1117
1135
        for c_offset in coalesced:
1130
1148
                cur_offset_and_size = offset_stack.next()
1131
1149
 
1132
1150
    def rename(self, rel_from, rel_to):
1133
 
        self._call('rename', 
 
1151
        self._call('rename',
1134
1152
                   self._remote_path(rel_from),
1135
1153
                   self._remote_path(rel_to))
1136
1154
 
1137
1155
    def move(self, rel_from, rel_to):
1138
 
        self._call('move', 
 
1156
        self._call('move',
1139
1157
                   self._remote_path(rel_from),
1140
1158
                   self._remote_path(rel_to))
1141
1159
 
1142
1160
    def rmdir(self, relpath):
1143
1161
        resp = self._call('rmdir', self._remote_path(relpath))
1144
1162
 
1145
 
    def _call(self, method, *args):
1146
 
        resp = self._client._call(method, *args)
1147
 
        self._translate_error(resp)
1148
 
 
1149
1163
    def _translate_error(self, resp, orig_path=None):
1150
1164
        """Raise an exception from a response"""
1151
1165
        if resp is None:
1188
1202
        else:
1189
1203
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1190
1204
 
1191
 
    def _send_tuple(self, args):
1192
 
        self._client.accept_bytes(_encode_tuple(args))
1193
 
 
1194
 
    def _recv_tuple(self):
1195
 
        return self._client._recv_tuple()
1196
 
 
1197
1205
    def disconnect(self):
1198
 
        self._client.disconnect()
 
1206
        self._medium.disconnect()
1199
1207
 
1200
1208
    def delete_tree(self, relpath):
1201
1209
        raise errors.TransportNotPossible('readonly transport')
1202
1210
 
1203
1211
    def stat(self, relpath):
1204
 
        resp = self._client._call('stat', self._remote_path(relpath))
 
1212
        resp = self._call2('stat', self._remote_path(relpath))
1205
1213
        if resp[0] == 'stat':
1206
1214
            return SmartStat(int(resp[1]), int(resp[2], 8))
1207
1215
        else:
1224
1232
        return True
1225
1233
 
1226
1234
    def list_dir(self, relpath):
1227
 
        resp = self._client._call('list_dir',
1228
 
                                  self._remote_path(relpath))
 
1235
        resp = self._call2('list_dir', self._remote_path(relpath))
1229
1236
        if resp[0] == 'names':
1230
1237
            return [name.encode('ascii') for name in resp[1:]]
1231
1238
        else:
1232
1239
            self._translate_error(resp)
1233
1240
 
1234
1241
    def iter_files_recursive(self):
1235
 
        resp = self._client._call('iter_files_recursive',
1236
 
                                  self._remote_path(''))
 
1242
        resp = self._call2('iter_files_recursive', self._remote_path(''))
1237
1243
        if resp[0] == 'names':
1238
1244
            return resp[1:]
1239
1245
        else:
1240
1246
            self._translate_error(resp)
1241
1247
 
1242
1248
 
1243
 
class SmartStreamClient(SmartProtocolBase):
1244
 
    """Connection to smart server over two streams"""
1245
 
 
1246
 
    def _send_bulk_data(self, body):
1247
 
        self._ensure_connection()
1248
 
        SmartProtocolBase._send_bulk_data(self, body)
1249
 
        
1250
 
    def disconnect(self):
1251
 
        """If this medium maintains a persistent connection, close it.
1252
 
        
1253
 
        The default implementation does nothing.
1254
 
        """
1255
 
        
1256
 
    def _call(self, *args):
1257
 
        bytes = _encode_tuple(args)
1258
 
        # should be self.medium.accept_bytes(bytes) XXX
1259
 
        self.accept_bytes(bytes)
1260
 
        return self._recv_tuple()
1261
 
 
1262
 
    def _call_with_upload(self, method, args, body):
1263
 
        """Call an rpc, supplying bulk upload data.
1264
 
 
1265
 
        :param method: method name to call
1266
 
        :param args: parameter args tuple
1267
 
        :param body: upload body as a byte string
1268
 
        """
1269
 
        bytes = _encode_tuple((method,) + args)
1270
 
        bytes += self._encode_bulk_data(body)
1271
 
        # should be self.medium.accept_bytes XXX
1272
 
        self.accept_bytes(bytes)
1273
 
        return self._recv_tuple()
 
1249
class SmartClientMediumRequest(object):
 
1250
    """A request on a SmartClientMedium.
 
1251
 
 
1252
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1253
    the response bytes to be read via read_bytes.
 
1254
 
 
1255
    For instance:
 
1256
    request.accept_bytes('123')
 
1257
    request.finished_writing()
 
1258
    result = request.read_bytes(3)
 
1259
    request.finished_reading()
 
1260
 
 
1261
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1262
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1263
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1264
    details on concurrency and pipelining.
 
1265
    """
 
1266
 
 
1267
    def __init__(self, medium):
 
1268
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1269
        self._medium = medium
 
1270
        # we track state by constants - we may want to use the same
 
1271
        # pattern as BodyReader if it gets more complex.
 
1272
        # valid states are: "writing", "reading", "done"
 
1273
        self._state = "writing"
 
1274
 
 
1275
    def accept_bytes(self, bytes):
 
1276
        """Accept bytes for inclusion in this request.
 
1277
 
 
1278
        This method may not be be called after finished_writing() has been
 
1279
        called.  It depends upon the Medium whether or not the bytes will be
 
1280
        immediately transmitted. Message based Mediums will tend to buffer the
 
1281
        bytes until finished_writing() is called.
 
1282
 
 
1283
        :param bytes: A bytestring.
 
1284
        """
 
1285
        if self._state != "writing":
 
1286
            raise errors.WritingCompleted(self)
 
1287
        self._accept_bytes(bytes)
 
1288
 
 
1289
    def _accept_bytes(self, bytes):
 
1290
        """Helper for accept_bytes.
 
1291
 
 
1292
        Accept_bytes checks the state of the request to determing if bytes
 
1293
        should be accepted. After that it hands off to _accept_bytes to do the
 
1294
        actual acceptance.
 
1295
        """
 
1296
        raise NotImplementedError(self._accept_bytes)
 
1297
 
 
1298
    def finished_reading(self):
 
1299
        """Inform the request that all desired data has been read.
 
1300
 
 
1301
        This will remove the request from the pipeline for its medium (if the
 
1302
        medium supports pipelining) and any further calls to methods on the
 
1303
        request will raise ReadingCompleted.
 
1304
        """
 
1305
        if self._state == "writing":
 
1306
            raise errors.WritingNotComplete(self)
 
1307
        if self._state != "reading":
 
1308
            raise errors.ReadingCompleted(self)
 
1309
        self._state = "done"
 
1310
        self._finished_reading()
 
1311
 
 
1312
    def _finished_reading(self):
 
1313
        """Helper for finished_reading.
 
1314
 
 
1315
        finished_reading checks the state of the request to determine if 
 
1316
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1317
        to perform the action.
 
1318
        """
 
1319
        raise NotImplementedError(self._finished_reading)
 
1320
 
 
1321
    def finished_writing(self):
 
1322
        """Finish the writing phase of this request.
 
1323
 
 
1324
        This will flush all pending data for this request along the medium.
 
1325
        After calling finished_writing, you may not call accept_bytes anymore.
 
1326
        """
 
1327
        if self._state != "writing":
 
1328
            raise errors.WritingCompleted(self)
 
1329
        self._state = "reading"
 
1330
        self._finished_writing()
 
1331
 
 
1332
    def _finished_writing(self):
 
1333
        """Helper for finished_writing.
 
1334
 
 
1335
        finished_writing checks the state of the request to determine if 
 
1336
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1337
        to perform the action.
 
1338
        """
 
1339
        raise NotImplementedError(self._finished_writing)
 
1340
 
 
1341
    def read_bytes(self, count):
 
1342
        """Read bytes from this requests response.
 
1343
 
 
1344
        This method will block and wait for count bytes to be read. It may not
 
1345
        be invoked until finished_writing() has been called - this is to ensure
 
1346
        a message-based approach to requests, for compatability with message
 
1347
        based mediums like HTTP.
 
1348
        """
 
1349
        if self._state == "writing":
 
1350
            raise errors.WritingNotComplete(self)
 
1351
        if self._state != "reading":
 
1352
            raise errors.ReadingCompleted(self)
 
1353
        return self._read_bytes(count)
 
1354
 
 
1355
    def _read_bytes(self, count):
 
1356
        """Helper for read_bytes.
 
1357
 
 
1358
        read_bytes checks the state of the request to determing if bytes
 
1359
        should be read. After that it hands off to _read_bytes to do the
 
1360
        actual read.
 
1361
        """
 
1362
        raise NotImplementedError(self._read_bytes)
 
1363
 
 
1364
 
 
1365
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1366
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1367
 
 
1368
    def __init__(self, medium):
 
1369
        SmartClientMediumRequest.__init__(self, medium)
 
1370
        # check that we are safe concurrency wise. If some streams start
 
1371
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1372
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1373
        # and the setting/unsetting of _current_request likewise moved into
 
1374
        # that class : but its unneeded overhead for now. RBC 20060922
 
1375
        if self._medium._current_request is not None:
 
1376
            raise errors.TooManyConcurrentRequests(self._medium)
 
1377
        self._medium._current_request = self
 
1378
 
 
1379
    def _accept_bytes(self, bytes):
 
1380
        """See SmartClientMediumRequest._accept_bytes.
 
1381
        
 
1382
        This forwards to self._medium._accept_bytes because we are operating
 
1383
        on the mediums stream.
 
1384
        """
 
1385
        self._medium._accept_bytes(bytes)
 
1386
 
 
1387
    def _finished_reading(self):
 
1388
        """See SmartClientMediumRequest._finished_reading.
 
1389
 
 
1390
        This clears the _current_request on self._medium to allow a new 
 
1391
        request to be created.
 
1392
        """
 
1393
        assert self._medium._current_request is self
 
1394
        self._medium._current_request = None
 
1395
        
 
1396
    def _finished_writing(self):
 
1397
        """See SmartClientMediumRequest._finished_writing.
 
1398
 
 
1399
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1400
        """
 
1401
        self._medium._flush()
 
1402
 
 
1403
    def _read_bytes(self, count):
 
1404
        """See SmartClientMediumRequest._read_bytes.
 
1405
        
 
1406
        This forwards to self._medium._read_bytes because we are operating
 
1407
        on the mediums stream.
 
1408
        """
 
1409
        return self._medium._read_bytes(count)
 
1410
 
 
1411
 
 
1412
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1413
    """The client-side protocol for smart version 1."""
 
1414
 
 
1415
    def __init__(self, request):
 
1416
        """Construct a SmartClientRequestProtocolOne.
 
1417
 
 
1418
        :param request: A SmartClientMediumRequest to serialise onto and
 
1419
            deserialise from.
 
1420
        """
 
1421
        self._request = request
 
1422
        self._body_buffer = None
 
1423
 
 
1424
    def call(self, *args):
 
1425
        bytes = _encode_tuple(args)
 
1426
        self._request.accept_bytes(bytes)
 
1427
        self._request.finished_writing()
 
1428
 
 
1429
    def call_with_body_bytes(self, args, body):
 
1430
        """Make a remote call of args with body bytes 'body'.
 
1431
 
 
1432
        After calling this, call read_response_tuple to find the result out.
 
1433
        """
 
1434
        bytes = _encode_tuple(args)
 
1435
        self._request.accept_bytes(bytes)
 
1436
        bytes = self._encode_bulk_data(body)
 
1437
        self._request.accept_bytes(bytes)
 
1438
        self._request.finished_writing()
 
1439
 
 
1440
    def call_with_body_readv_array(self, args, body):
 
1441
        """Make a remote call with a readv array.
 
1442
 
 
1443
        The body is encoded with one line per readv offset pair. The numbers in
 
1444
        each pair are separated by a comma, and no trailing \n is emitted.
 
1445
        """
 
1446
        bytes = _encode_tuple(args)
 
1447
        self._request.accept_bytes(bytes)
 
1448
        readv_bytes = self._serialise_offsets(body)
 
1449
        bytes = self._encode_bulk_data(readv_bytes)
 
1450
        self._request.accept_bytes(bytes)
 
1451
        self._request.finished_writing()
 
1452
 
 
1453
    def cancel_read_body(self):
 
1454
        """After expecting a body, a response code may indicate one otherwise.
 
1455
 
 
1456
        This method lets the domain client inform the protocol that no body
 
1457
        will be transmitted. This is a terminal method: after calling it the
 
1458
        protocol is not able to be used further.
 
1459
        """
 
1460
        self._request.finished_reading()
 
1461
 
 
1462
    def read_response_tuple(self, expect_body=False):
 
1463
        """Read a response tuple from the wire.
 
1464
 
 
1465
        This should only be called once.
 
1466
        """
 
1467
        result = self._recv_tuple()
 
1468
        if not expect_body:
 
1469
            self._request.finished_reading()
 
1470
        return result
 
1471
 
 
1472
    def read_body_bytes(self, count=-1):
 
1473
        """Read bytes from the body, decoding into a byte stream.
 
1474
        
 
1475
        We read all bytes at once to ensure we've checked the trailer for 
 
1476
        errors, and then feed the buffer back as read_body_bytes is called.
 
1477
        """
 
1478
        if self._body_buffer is not None:
 
1479
            return self._body_buffer.read(count)
 
1480
        _body_decoder = LengthPrefixedBodyDecoder()
 
1481
        # grab a byte from the wire: we do this so that we dont use too much
 
1482
        # from the wire; we should have the LengthPrefixedBodyDecoder tell
 
1483
        # us how much is needed once the header is written.
 
1484
        # i.e. self._body_decoder.next_read_size() would be a hint. 
 
1485
        while not _body_decoder.finished_reading:
 
1486
            byte = self._request.read_bytes(1)
 
1487
            _body_decoder.accept_bytes(byte)
 
1488
        self._request.finished_reading()
 
1489
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1490
        # XXX: TODO check the trailer result.
 
1491
        return self._body_buffer.read(count)
 
1492
 
 
1493
    def _recv_tuple(self):
 
1494
        """Recieve a tuple from the medium request."""
 
1495
        line = ''
 
1496
        while not line or line[-1] != '\n':
 
1497
            # yes, this is inefficient - but tuples are short.
 
1498
            new_char = self._request.read_bytes(1)
 
1499
            line += new_char
 
1500
            assert new_char != '', "end of file reading from server."
 
1501
        return _decode_tuple(line)
1274
1502
 
1275
1503
    def query_version(self):
1276
1504
        """Return protocol version number of the server."""
1277
 
        resp = self._call('hello')
 
1505
        self.call('hello')
 
1506
        resp = self.read_response_tuple()
1278
1507
        if resp == ('ok', '1'):
1279
1508
            return 1
1280
1509
        else:
1281
1510
            raise errors.SmartProtocolError("bad response %r" % (resp,))
1282
1511
 
1283
1512
 
1284
 
class SmartClientMedium(SmartStreamClient):
1285
 
    """Smart client is a medium for sending smart protocol requests over.
1286
 
 
1287
 
    XXX: we want explicit finalisation
1288
 
    """
1289
 
 
1290
 
 
1291
 
class SmartStreamMediumClient(SmartClientMedium):
1292
 
    """The SmartStreamMediumClient knows how to close the stream when it is
1293
 
    finished with it.
1294
 
    """
 
1513
class SmartClientMedium(object):
 
1514
    """Smart client is a medium for sending smart protocol requests over."""
 
1515
 
 
1516
    def disconnect(self):
 
1517
        """If this medium maintains a persistent connection, close it.
 
1518
        
 
1519
        The default implementation does nothing.
 
1520
        """
 
1521
        
 
1522
 
 
1523
class SmartClientStreamMedium(SmartClientMedium):
 
1524
    """Stream based medium common class.
 
1525
 
 
1526
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1527
    SmartClientStreamMediumRequest for their requests, and should implement
 
1528
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1529
    receive bytes.
 
1530
    """
 
1531
 
 
1532
    def __init__(self):
 
1533
        self._current_request = None
 
1534
 
 
1535
    def accept_bytes(self, bytes):
 
1536
        self._accept_bytes(bytes)
1295
1537
 
1296
1538
    def __del__(self):
 
1539
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1540
        finished with it.
 
1541
        """
1297
1542
        self.disconnect()
1298
1543
 
1299
 
 
1300
 
class SmartSimplePipesClientMedium(SmartClientMedium):
 
1544
    def _flush(self):
 
1545
        """Flush the output stream.
 
1546
        
 
1547
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1548
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1549
        """
 
1550
        raise NotImplementedError(self._flush)
 
1551
 
 
1552
    def get_request(self):
 
1553
        """See SmartClientMedium.get_request().
 
1554
 
 
1555
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1556
        for get_request.
 
1557
        """
 
1558
        return SmartClientStreamMediumRequest(self)
 
1559
 
 
1560
    def read_bytes(self, count):
 
1561
        return self._read_bytes(count)
 
1562
 
 
1563
 
 
1564
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1301
1565
    """A client medium using simple pipes.
1302
1566
    
1303
1567
    This client does not manage the pipes: it assumes they will always be open.
1304
1568
    """
1305
1569
 
1306
1570
    def __init__(self, readable_pipe, writeable_pipe):
 
1571
        SmartClientStreamMedium.__init__(self)
1307
1572
        self._readable_pipe = readable_pipe
1308
1573
        self._writeable_pipe = writeable_pipe
1309
1574
 
1310
 
    def accept_bytes(self, bytes):
1311
 
        """See SmartClientMedium.accept_bytes."""
 
1575
    def _accept_bytes(self, bytes):
 
1576
        """See SmartClientStreamMedium.accept_bytes."""
1312
1577
        self._writeable_pipe.write(bytes)
1313
1578
 
1314
 
    def read_bytes(self, count):
1315
 
        """See SmartClientMedium.read_bytes."""
 
1579
    def _flush(self):
 
1580
        """See SmartClientStreamMedium._flush()."""
 
1581
        self._writeable_pipe.flush()
 
1582
 
 
1583
    def _read_bytes(self, count):
 
1584
        """See SmartClientStreamMedium._read_bytes."""
1316
1585
        return self._readable_pipe.read(count)
1317
1586
 
1318
 
    def _recv_bulk(self):
1319
 
        """transitional api from 'client' to 'medium'."""
1320
 
        self._in = self._readable_pipe
1321
 
        try:
1322
 
            return SmartClientMedium._recv_bulk(self)
1323
 
        finally:
1324
 
            self._in = None
1325
 
 
1326
 
    def _recv_tuple(self):
1327
 
        """transitional api from 'client' to 'medium'."""
1328
 
        return _recv_tuple(self._readable_pipe)
1329
 
 
1330
 
    def _write_and_flush(self, bytes, file=None):
1331
 
        """Thunk from the 'client' api to the 'Medium' api."""
1332
 
        assert file is None
1333
 
        self.accept_bytes(bytes)
1334
 
 
1335
 
 
1336
 
class SmartSSHClientMedium(SmartStreamMediumClient):
 
1587
 
 
1588
class SmartSSHClientMedium(SmartClientStreamMedium):
1337
1589
    """A client medium using SSH."""
1338
1590
    
1339
1591
    def __init__(self, host, port=None, username=None, password=None,
1343
1595
        :param vendor: An optional override for the ssh vendor to use. See
1344
1596
            bzrlib.transport.ssh for details on ssh vendors.
1345
1597
        """
 
1598
        SmartClientStreamMedium.__init__(self)
1346
1599
        self._connected = False
1347
1600
        self._host = host
1348
1601
        self._password = password
1353
1606
        self._vendor = vendor
1354
1607
        self._write_to = None
1355
1608
 
1356
 
    def accept_bytes(self, bytes):
1357
 
        """See SmartClientMedium.accept_bytes."""
 
1609
    def _accept_bytes(self, bytes):
 
1610
        """See SmartClientStreamMedium.accept_bytes."""
1358
1611
        self._ensure_connection()
1359
1612
        self._write_to.write(bytes)
1360
 
        self._write_to.flush()
1361
1613
 
1362
1614
    def disconnect(self):
1363
1615
        """See SmartClientMedium.disconnect()."""
1385
1637
            self._ssh_connection.get_filelike_channels()
1386
1638
        self._connected = True
1387
1639
 
1388
 
    def read_bytes(self, count):
1389
 
        """See SmartClientMedium.read_bytes."""
1390
 
        raise errors.MediumNotConnected(self)
1391
 
 
1392
 
    def _recv_bulk(self):
1393
 
        """transitional api from 'client' to 'medium'."""
1394
 
        self._in = self._read_from
1395
 
        try:
1396
 
            return SmartStreamMediumClient._recv_bulk(self)
1397
 
        finally:
1398
 
            self._in = None
1399
 
 
1400
 
    def _recv_tuple(self):
1401
 
        """transitional api from 'client' to 'medium'."""
1402
 
        return _recv_tuple(self._read_from)
1403
 
 
1404
 
    def _write_and_flush(self, bytes, file=None):
1405
 
        """Thunk from the 'client' api to the 'Medium' api."""
1406
 
        assert file is None
1407
 
        self.accept_bytes(bytes)
1408
 
 
1409
 
 
1410
 
class SmartTCPClientMedium(SmartStreamMediumClient):
 
1640
    def _flush(self):
 
1641
        """See SmartClientStreamMedium._flush()."""
 
1642
        self._write_to.flush()
 
1643
 
 
1644
    def _read_bytes(self, count):
 
1645
        """See SmartClientStreamMedium.read_bytes."""
 
1646
        if not self._connected:
 
1647
            raise errors.MediumNotConnected(self)
 
1648
        return self._read_from.read(count)
 
1649
 
 
1650
 
 
1651
class SmartTCPClientMedium(SmartClientStreamMedium):
1411
1652
    """A client medium using TCP."""
1412
1653
    
1413
1654
    def __init__(self, host, port):
1414
1655
        """Creates a client that will connect on the first use."""
 
1656
        SmartClientStreamMedium.__init__(self)
1415
1657
        self._connected = False
1416
1658
        self._host = host
1417
1659
        self._port = port
1418
1660
        self._socket = None
1419
1661
 
1420
 
    def accept_bytes(self, bytes):
 
1662
    def _accept_bytes(self, bytes):
1421
1663
        """See SmartClientMedium.accept_bytes."""
1422
1664
        self._ensure_connection()
1423
1665
        self._socket.sendall(bytes)
1442
1684
                    (self._host, self._port, os.strerror(result)))
1443
1685
        self._connected = True
1444
1686
 
1445
 
    def read_bytes(self, count):
 
1687
    def _flush(self):
 
1688
        """See SmartClientStreamMedium._flush().
 
1689
        
 
1690
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1691
        add a means to do a flush, but that can be done in the future.
 
1692
        """
 
1693
 
 
1694
    def _read_bytes(self, count):
1446
1695
        """See SmartClientMedium.read_bytes."""
1447
 
        raise errors.MediumNotConnected(self)
1448
 
 
1449
 
    def _recv_bulk(self):
1450
 
        """transitional api from 'client' to 'medium'."""
1451
 
        self._in = self._socket.makefile('r', 0)
1452
 
        try:
1453
 
            return SmartStreamMediumClient._recv_bulk(self)
1454
 
        finally:
1455
 
            self._in = None
1456
 
 
1457
 
    def _recv_tuple(self):
1458
 
        """transitional api from 'client' to 'medium'."""
1459
 
        return _recv_tuple(self._socket.makefile('r', 0))
1460
 
 
1461
 
    def _write_and_flush(self, bytes, file=None):
1462
 
        """Thunk from the 'client' api to the 'Medium' api."""
1463
 
        assert file is None
1464
 
        self.accept_bytes(bytes)
 
1696
        if not self._connected:
 
1697
            raise errors.MediumNotConnected(self)
 
1698
        return self._socket.recv(count)
1465
1699
 
1466
1700
 
1467
1701
class SmartTCPTransport(SmartTransport):