1346
1347
class RemoteStreamSink(repository.StreamSink):
1349
def __init__(self, target_repo):
1350
repository.StreamSink.__init__(self, target_repo)
1351
self._resume_tokens = []
1348
1353
def _insert_real(self, stream, src_format):
1349
1354
self.target_repo._ensure_real()
1350
1355
sink = self.target_repo._real_repository._get_sink()
1351
return sink.insert_stream(stream, src_format)
1356
result = sink.insert_stream(stream, src_format)
1358
self.target_repo.autopack()
1353
1361
def insert_stream(self, stream, src_format):
1354
1362
repo = self.target_repo
1355
# Until we can handle deltas in stack repositories we can't hand all
1356
# the processing off to a remote server.
1357
if self.target_repo._fallback_repositories:
1358
return self._insert_real(stream, src_format)
1359
1363
client = repo._client
1360
1364
medium = client._medium
1361
if medium._is_remote_before((1,13)):
1365
if medium._is_remote_before((1, 13)):
1362
1366
# No possible way this can work.
1363
1367
return self._insert_real(stream, src_format)
1364
1368
path = repo.bzrdir._path_for_remote_call(client)
1365
# XXX: Ugly but important for correctness, *will* be fixed during 1.13
1366
# cycle. Pushing a stream that is interrupted results in a fallback to
1367
# the _real_repositories sink *with a partial stream*. Thats bad
1368
# because we insert less data than bzr expected. To avoid this we do a
1369
# trial push to make sure the verb is accessible, and do not fallback
1370
# when actually pushing the stream. A cleanup patch is going to look at
1371
# rewinding/restarting the stream/partial buffering etc.
1372
byte_stream = self._stream_to_byte_stream([], src_format)
1374
response = client.call_with_body_stream(
1375
('Repository.insert_stream', path), byte_stream)
1376
except errors.UnknownSmartMethod:
1377
medium._remember_remote_is_before((1,13))
1378
return self._insert_real(stream, src_format)
1369
if not self._resume_tokens:
1370
# XXX: Ugly but important for correctness, *will* be fixed during
1371
# 1.13 cycle. Pushing a stream that is interrupted results in a
1372
# fallback to the _real_repositories sink *with a partial stream*.
1373
# Thats bad because we insert less data than bzr expected. To avoid
1374
# this we do a trial push to make sure the verb is accessible, and
1375
# do not fallback when actually pushing the stream. A cleanup patch
1376
# is going to look at rewinding/restarting the stream/partial
1378
byte_stream = self._stream_to_byte_stream([], src_format)
1381
response = client.call_with_body_stream(
1382
('Repository.insert_stream', path, resume_tokens), byte_stream)
1383
except errors.UnknownSmartMethod:
1384
medium._remember_remote_is_before((1,13))
1385
return self._insert_real(stream, src_format)
1379
1386
byte_stream = self._stream_to_byte_stream(stream, src_format)
1387
resume_tokens = ' '.join(self._resume_tokens)
1380
1388
response = client.call_with_body_stream(
1381
('Repository.insert_stream', path), byte_stream)
1382
if response[0][0] not in ('ok', ):
1389
('Repository.insert_stream', path, resume_tokens), byte_stream)
1390
if response[0][0] not in ('ok', 'missing-basis'):
1383
1391
raise errors.UnexpectedSmartServerResponse(response)
1392
if response[0][0] == 'missing-basis':
1393
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1394
self._resume_tokens = tokens
1397
if self.target_repo._real_repository is not None:
1398
collection = getattr(self.target_repo._real_repository,
1399
'_pack_collection', None)
1400
if collection is not None:
1401
collection.reload_pack_names()
1385
1404
def _stream_to_byte_stream(self, stream, src_format):