1226
1234
def list_dir(self, relpath):
1227
resp = self._client._call('list_dir',
1228
self._remote_path(relpath))
1235
resp = self._call2('list_dir', self._remote_path(relpath))
1229
1236
if resp[0] == 'names':
1230
1237
return [name.encode('ascii') for name in resp[1:]]
1232
1239
self._translate_error(resp)
1234
1241
def iter_files_recursive(self):
1235
resp = self._client._call('iter_files_recursive',
1236
self._remote_path(''))
1242
resp = self._call2('iter_files_recursive', self._remote_path(''))
1237
1243
if resp[0] == 'names':
1238
1244
return resp[1:]
1240
1246
self._translate_error(resp)
1243
class SmartStreamClient(SmartProtocolBase):
1244
"""Connection to smart server over two streams"""
1246
def _send_bulk_data(self, body):
1247
self._ensure_connection()
1248
SmartProtocolBase._send_bulk_data(self, body)
1250
def disconnect(self):
1251
"""If this medium maintains a persistent connection, close it.
1253
The default implementation does nothing.
1256
def _call(self, *args):
1257
bytes = _encode_tuple(args)
1258
# should be self.medium.accept_bytes(bytes) XXX
1259
self.accept_bytes(bytes)
1260
return self._recv_tuple()
1262
def _call_with_upload(self, method, args, body):
1263
"""Call an rpc, supplying bulk upload data.
1265
:param method: method name to call
1266
:param args: parameter args tuple
1267
:param body: upload body as a byte string
1269
bytes = _encode_tuple((method,) + args)
1270
bytes += self._encode_bulk_data(body)
1271
# should be self.medium.accept_bytes XXX
1272
self.accept_bytes(bytes)
1273
return self._recv_tuple()
1249
class SmartClientMediumRequest(object):
1250
"""A request on a SmartClientMedium.
1252
Each request allows bytes to be provided to it via accept_bytes, and then
1253
the response bytes to be read via read_bytes.
1256
request.accept_bytes('123')
1257
request.finished_writing()
1258
result = request.read_bytes(3)
1259
request.finished_reading()
1261
It is up to the individual SmartClientMedium whether multiple concurrent
1262
requests can exist. See SmartClientMedium.get_request to obtain instances
1263
of SmartClientMediumRequest, and the concrete Medium you are using for
1264
details on concurrency and pipelining.
1267
def __init__(self, medium):
1268
"""Construct a SmartClientMediumRequest for the medium medium."""
1269
self._medium = medium
1270
# we track state by constants - we may want to use the same
1271
# pattern as BodyReader if it gets more complex.
1272
# valid states are: "writing", "reading", "done"
1273
self._state = "writing"
1275
def accept_bytes(self, bytes):
1276
"""Accept bytes for inclusion in this request.
1278
This method may not be be called after finished_writing() has been
1279
called. It depends upon the Medium whether or not the bytes will be
1280
immediately transmitted. Message based Mediums will tend to buffer the
1281
bytes until finished_writing() is called.
1283
:param bytes: A bytestring.
1285
if self._state != "writing":
1286
raise errors.WritingCompleted(self)
1287
self._accept_bytes(bytes)
1289
def _accept_bytes(self, bytes):
1290
"""Helper for accept_bytes.
1292
Accept_bytes checks the state of the request to determing if bytes
1293
should be accepted. After that it hands off to _accept_bytes to do the
1296
raise NotImplementedError(self._accept_bytes)
1298
def finished_reading(self):
1299
"""Inform the request that all desired data has been read.
1301
This will remove the request from the pipeline for its medium (if the
1302
medium supports pipelining) and any further calls to methods on the
1303
request will raise ReadingCompleted.
1305
if self._state == "writing":
1306
raise errors.WritingNotComplete(self)
1307
if self._state != "reading":
1308
raise errors.ReadingCompleted(self)
1309
self._state = "done"
1310
self._finished_reading()
1312
def _finished_reading(self):
1313
"""Helper for finished_reading.
1315
finished_reading checks the state of the request to determine if
1316
finished_reading is allowed, and if it is hands off to _finished_reading
1317
to perform the action.
1319
raise NotImplementedError(self._finished_reading)
1321
def finished_writing(self):
1322
"""Finish the writing phase of this request.
1324
This will flush all pending data for this request along the medium.
1325
After calling finished_writing, you may not call accept_bytes anymore.
1327
if self._state != "writing":
1328
raise errors.WritingCompleted(self)
1329
self._state = "reading"
1330
self._finished_writing()
1332
def _finished_writing(self):
1333
"""Helper for finished_writing.
1335
finished_writing checks the state of the request to determine if
1336
finished_writing is allowed, and if it is hands off to _finished_writing
1337
to perform the action.
1339
raise NotImplementedError(self._finished_writing)
1341
def read_bytes(self, count):
1342
"""Read bytes from this requests response.
1344
This method will block and wait for count bytes to be read. It may not
1345
be invoked until finished_writing() has been called - this is to ensure
1346
a message-based approach to requests, for compatability with message
1347
based mediums like HTTP.
1349
if self._state == "writing":
1350
raise errors.WritingNotComplete(self)
1351
if self._state != "reading":
1352
raise errors.ReadingCompleted(self)
1353
return self._read_bytes(count)
1355
def _read_bytes(self, count):
1356
"""Helper for read_bytes.
1358
read_bytes checks the state of the request to determing if bytes
1359
should be read. After that it hands off to _read_bytes to do the
1362
raise NotImplementedError(self._read_bytes)
1365
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1366
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1368
def __init__(self, medium):
1369
SmartClientMediumRequest.__init__(self, medium)
1370
# check that we are safe concurrency wise. If some streams start
1371
# allowing concurrent requests - i.e. via multiplexing - then this
1372
# assert should be moved to SmartClientStreamMedium.get_request,
1373
# and the setting/unsetting of _current_request likewise moved into
1374
# that class : but its unneeded overhead for now. RBC 20060922
1375
if self._medium._current_request is not None:
1376
raise errors.TooManyConcurrentRequests(self._medium)
1377
self._medium._current_request = self
1379
def _accept_bytes(self, bytes):
1380
"""See SmartClientMediumRequest._accept_bytes.
1382
This forwards to self._medium._accept_bytes because we are operating
1383
on the mediums stream.
1385
self._medium._accept_bytes(bytes)
1387
def _finished_reading(self):
1388
"""See SmartClientMediumRequest._finished_reading.
1390
This clears the _current_request on self._medium to allow a new
1391
request to be created.
1393
assert self._medium._current_request is self
1394
self._medium._current_request = None
1396
def _finished_writing(self):
1397
"""See SmartClientMediumRequest._finished_writing.
1399
This invokes self._medium._flush to ensure all bytes are transmitted.
1401
self._medium._flush()
1403
def _read_bytes(self, count):
1404
"""See SmartClientMediumRequest._read_bytes.
1406
This forwards to self._medium._read_bytes because we are operating
1407
on the mediums stream.
1409
return self._medium._read_bytes(count)
1412
class SmartClientRequestProtocolOne(SmartProtocolBase):
1413
"""The client-side protocol for smart version 1."""
1415
def __init__(self, request):
1416
"""Construct a SmartClientRequestProtocolOne.
1418
:param request: A SmartClientMediumRequest to serialise onto and
1421
self._request = request
1422
self._body_buffer = None
1424
def call(self, *args):
1425
bytes = _encode_tuple(args)
1426
self._request.accept_bytes(bytes)
1427
self._request.finished_writing()
1429
def call_with_body_bytes(self, args, body):
1430
"""Make a remote call of args with body bytes 'body'.
1432
After calling this, call read_response_tuple to find the result out.
1434
bytes = _encode_tuple(args)
1435
self._request.accept_bytes(bytes)
1436
bytes = self._encode_bulk_data(body)
1437
self._request.accept_bytes(bytes)
1438
self._request.finished_writing()
1440
def call_with_body_readv_array(self, args, body):
1441
"""Make a remote call with a readv array.
1443
The body is encoded with one line per readv offset pair. The numbers in
1444
each pair are separated by a comma, and no trailing \n is emitted.
1446
bytes = _encode_tuple(args)
1447
self._request.accept_bytes(bytes)
1448
readv_bytes = self._serialise_offsets(body)
1449
bytes = self._encode_bulk_data(readv_bytes)
1450
self._request.accept_bytes(bytes)
1451
self._request.finished_writing()
1453
def cancel_read_body(self):
1454
"""After expecting a body, a response code may indicate one otherwise.
1456
This method lets the domain client inform the protocol that no body
1457
will be transmitted. This is a terminal method: after calling it the
1458
protocol is not able to be used further.
1460
self._request.finished_reading()
1462
def read_response_tuple(self, expect_body=False):
1463
"""Read a response tuple from the wire.
1465
This should only be called once.
1467
result = self._recv_tuple()
1469
self._request.finished_reading()
1472
def read_body_bytes(self, count=-1):
1473
"""Read bytes from the body, decoding into a byte stream.
1475
We read all bytes at once to ensure we've checked the trailer for
1476
errors, and then feed the buffer back as read_body_bytes is called.
1478
if self._body_buffer is not None:
1479
return self._body_buffer.read(count)
1480
_body_decoder = LengthPrefixedBodyDecoder()
1481
# grab a byte from the wire: we do this so that we dont use too much
1482
# from the wire; we should have the LengthPrefixedBodyDecoder tell
1483
# us how much is needed once the header is written.
1484
# i.e. self._body_decoder.next_read_size() would be a hint.
1485
while not _body_decoder.finished_reading:
1486
byte = self._request.read_bytes(1)
1487
_body_decoder.accept_bytes(byte)
1488
self._request.finished_reading()
1489
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1490
# XXX: TODO check the trailer result.
1491
return self._body_buffer.read(count)
1493
def _recv_tuple(self):
1494
"""Recieve a tuple from the medium request."""
1496
while not line or line[-1] != '\n':
1497
# yes, this is inefficient - but tuples are short.
1498
new_char = self._request.read_bytes(1)
1500
assert new_char != '', "end of file reading from server."
1501
return _decode_tuple(line)
1275
1503
def query_version(self):
1276
1504
"""Return protocol version number of the server."""
1277
resp = self._call('hello')
1506
resp = self.read_response_tuple()
1278
1507
if resp == ('ok', '1'):
1281
1510
raise errors.SmartProtocolError("bad response %r" % (resp,))
1284
class SmartClientMedium(SmartStreamClient):
1285
"""Smart client is a medium for sending smart protocol requests over.
1287
XXX: we want explicit finalisation
1291
class SmartStreamMediumClient(SmartClientMedium):
1292
"""The SmartStreamMediumClient knows how to close the stream when it is
1513
class SmartClientMedium(object):
1514
"""Smart client is a medium for sending smart protocol requests over."""
1516
def disconnect(self):
1517
"""If this medium maintains a persistent connection, close it.
1519
The default implementation does nothing.
1523
class SmartClientStreamMedium(SmartClientMedium):
1524
"""Stream based medium common class.
1526
SmartClientStreamMediums operate on a stream. All subclasses use a common
1527
SmartClientStreamMediumRequest for their requests, and should implement
1528
_accept_bytes and _read_bytes to allow the request objects to send and
1533
self._current_request = None
1535
def accept_bytes(self, bytes):
1536
self._accept_bytes(bytes)
1296
1538
def __del__(self):
1539
"""The SmartClientStreamMedium knows how to close the stream when it is
1297
1542
self.disconnect()
1300
class SmartSimplePipesClientMedium(SmartClientMedium):
1545
"""Flush the output stream.
1547
This method is used by the SmartClientStreamMediumRequest to ensure that
1548
all data for a request is sent, to avoid long timeouts or deadlocks.
1550
raise NotImplementedError(self._flush)
1552
def get_request(self):
1553
"""See SmartClientMedium.get_request().
1555
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1558
return SmartClientStreamMediumRequest(self)
1560
def read_bytes(self, count):
1561
return self._read_bytes(count)
1564
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1301
1565
"""A client medium using simple pipes.
1303
1567
This client does not manage the pipes: it assumes they will always be open.
1306
1570
def __init__(self, readable_pipe, writeable_pipe):
1571
SmartClientStreamMedium.__init__(self)
1307
1572
self._readable_pipe = readable_pipe
1308
1573
self._writeable_pipe = writeable_pipe
1310
def accept_bytes(self, bytes):
1311
"""See SmartClientMedium.accept_bytes."""
1575
def _accept_bytes(self, bytes):
1576
"""See SmartClientStreamMedium.accept_bytes."""
1312
1577
self._writeable_pipe.write(bytes)
1314
def read_bytes(self, count):
1315
"""See SmartClientMedium.read_bytes."""
1580
"""See SmartClientStreamMedium._flush()."""
1581
self._writeable_pipe.flush()
1583
def _read_bytes(self, count):
1584
"""See SmartClientStreamMedium._read_bytes."""
1316
1585
return self._readable_pipe.read(count)
1318
def _recv_bulk(self):
1319
"""transitional api from 'client' to 'medium'."""
1320
self._in = self._readable_pipe
1322
return SmartClientMedium._recv_bulk(self)
1326
def _recv_tuple(self):
1327
"""transitional api from 'client' to 'medium'."""
1328
return _recv_tuple(self._readable_pipe)
1330
def _write_and_flush(self, bytes, file=None):
1331
"""Thunk from the 'client' api to the 'Medium' api."""
1333
self.accept_bytes(bytes)
1336
class SmartSSHClientMedium(SmartStreamMediumClient):
1588
class SmartSSHClientMedium(SmartClientStreamMedium):
1337
1589
"""A client medium using SSH."""
1339
1591
def __init__(self, host, port=None, username=None, password=None,