1680
1685
def insert_stream(self, stream, src_format, resume_tokens):
1681
1686
target = self.target_repo
1682
1687
target._unstacked_provider.missing_keys.clear()
1688
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
1683
1689
if target._lock_token:
1684
verb = 'Repository.insert_stream_locked'
1685
extra_args = (target._lock_token or '',)
1686
required_version = (1, 14)
1690
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
1691
lock_args = (target._lock_token or '',)
1688
verb = 'Repository.insert_stream'
1690
required_version = (1, 13)
1693
candidate_calls.append(('Repository.insert_stream', (1, 13)))
1691
1695
client = target._client
1692
1696
medium = client._medium
1693
if medium._is_remote_before(required_version):
1694
# No possible way this can work.
1695
return self._insert_real(stream, src_format, resume_tokens)
1696
1697
path = target.bzrdir._path_for_remote_call(client)
1697
if not resume_tokens:
1698
# XXX: Ugly but important for correctness, *will* be fixed during
1699
# 1.13 cycle. Pushing a stream that is interrupted results in a
1700
# fallback to the _real_repositories sink *with a partial stream*.
1701
# Thats bad because we insert less data than bzr expected. To avoid
1702
# this we do a trial push to make sure the verb is accessible, and
1703
# do not fallback when actually pushing the stream. A cleanup patch
1704
# is going to look at rewinding/restarting the stream/partial
1698
# Probe for the verb to use with an empty stream before sending the
1699
# real stream to it. We do this both to avoid the risk of sending a
1700
# large request that is then rejected, and because we don't want to
1701
# implement a way to buffer, rewind, or restart the stream.
1703
for verb, required_version in candidate_calls:
1704
if medium._is_remote_before(required_version):
1707
# We've already done the probing (and set _is_remote_before) on
1708
# a previous insert.
1706
1711
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1708
1713
response = client.call_with_body_stream(
1709
(verb, path, '') + extra_args, byte_stream)
1714
(verb, path, '') + lock_args, byte_stream)
1710
1715
except errors.UnknownSmartMethod:
1711
1716
medium._remember_remote_is_before(required_version)
1712
return self._insert_real(stream, src_format, resume_tokens)
1722
return self._insert_real(stream, src_format, resume_tokens)
1723
self._last_inv_record = None
1724
self._last_substream = None
1725
if required_version < (1, 19):
1726
# Remote side doesn't support inventory deltas. Wrap the stream to
1727
# make sure we don't send any. If the stream contains inventory
1728
# deltas we'll interrupt the smart insert_stream request and
1730
stream = self._stop_stream_if_inventory_delta(stream)
1713
1731
byte_stream = smart_repo._stream_to_byte_stream(
1714
1732
stream, src_format)
1715
1733
resume_tokens = ' '.join(resume_tokens)
1716
1734
response = client.call_with_body_stream(
1717
(verb, path, resume_tokens) + extra_args, byte_stream)
1735
(verb, path, resume_tokens) + lock_args, byte_stream)
1718
1736
if response[0][0] not in ('ok', 'missing-basis'):
1719
1737
raise errors.UnexpectedSmartServerResponse(response)
1738
if self._last_substream is not None:
1739
# The stream included an inventory-delta record, but the remote
1740
# side isn't new enough to support them. So we need to send the
1741
# rest of the stream via VFS.
1742
self.target_repo.refresh_data()
1743
return self._resume_stream_with_vfs(response, src_format)
1720
1744
if response[0][0] == 'missing-basis':
1721
1745
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1722
1746
resume_tokens = tokens
1725
1749
self.target_repo.refresh_data()
1726
1750
return [], set()
1752
def _resume_stream_with_vfs(self, response, src_format):
1753
"""Resume sending a stream via VFS, first resending the record and
1754
substream that couldn't be sent via an insert_stream verb.
1756
if response[0][0] == 'missing-basis':
1757
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1758
# Ignore missing_keys, we haven't finished inserting yet
1761
def resume_substream():
1762
# Yield the substream that was interrupted.
1763
for record in self._last_substream:
1765
self._last_substream = None
1766
def resume_stream():
1767
# Finish sending the interrupted substream
1768
yield ('inventory-deltas', resume_substream())
1769
# Then simply continue sending the rest of the stream.
1770
for substream_kind, substream in self._last_stream:
1771
yield substream_kind, substream
1772
return self._insert_real(resume_stream(), src_format, tokens)
1774
def _stop_stream_if_inventory_delta(self, stream):
1775
"""Normally this just lets the original stream pass-through unchanged.
1777
However if any 'inventory-deltas' substream occurs it will stop
1778
streaming, and store the interrupted substream and stream in
1779
self._last_substream and self._last_stream so that the stream can be
1780
resumed by _resume_stream_with_vfs.
1783
stream_iter = iter(stream)
1784
for substream_kind, substream in stream_iter:
1785
if substream_kind == 'inventory-deltas':
1786
self._last_substream = substream
1787
self._last_stream = stream_iter
1790
yield substream_kind, substream
1729
1793
class RemoteStreamSource(repository.StreamSource):
1730
1794
"""Stream data from a remote server."""
1733
1797
if (self.from_repository._fallback_repositories and
1734
1798
self.to_format._fetch_order == 'topological'):
1735
1799
return self._real_stream(self.from_repository, search)
1736
return self.missing_parents_chain(search, [self.from_repository] +
1737
self.from_repository._fallback_repositories)
1802
repos = [self.from_repository]
1808
repos.extend(repo._fallback_repositories)
1809
sources.append(repo)
1810
return self.missing_parents_chain(search, sources)
1812
def get_stream_for_missing_keys(self, missing_keys):
1813
self.from_repository._ensure_real()
1814
real_repo = self.from_repository._real_repository
1815
real_source = real_repo._get_source(self.to_format)
1816
return real_source.get_stream_for_missing_keys(missing_keys)
1739
1818
def _real_stream(self, repo, search):
1740
1819
"""Get a stream for search from repo.
1770
1850
return self._real_stream(repo, search)
1771
1851
client = repo._client
1772
1852
medium = client._medium
1773
if medium._is_remote_before((1, 13)):
1774
# streaming was added in 1.13
1775
return self._real_stream(repo, search)
1776
1853
path = repo.bzrdir._path_for_remote_call(client)
1778
search_bytes = repo._serialise_search_result(search)
1779
response = repo._call_with_body_bytes_expecting_body(
1780
'Repository.get_stream',
1781
(path, self.to_format.network_name()), search_bytes)
1782
response_tuple, response_handler = response
1783
except errors.UnknownSmartMethod:
1784
medium._remember_remote_is_before((1,13))
1854
search_bytes = repo._serialise_search_result(search)
1855
args = (path, self.to_format.network_name())
1857
('Repository.get_stream_1.19', (1, 19)),
1858
('Repository.get_stream', (1, 13))]
1860
for verb, version in candidate_verbs:
1861
if medium._is_remote_before(version):
1864
response = repo._call_with_body_bytes_expecting_body(
1865
verb, args, search_bytes)
1866
except errors.UnknownSmartMethod:
1867
medium._remember_remote_is_before(version)
1869
response_tuple, response_handler = response
1785
1873
return self._real_stream(repo, search)
1786
1874
if response_tuple[0] != 'ok':
1787
1875
raise errors.UnexpectedSmartServerResponse(response_tuple)