13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
153
153
except errors.UnknownSmartMethod:
154
154
medium._remember_remote_is_before((1, 13))
155
155
return self._vfs_cloning_metadir(require_stacking=require_stacking)
156
except errors.UnknownErrorFromSmartServer, err:
157
if err.error_tuple != ('BranchReference',):
159
# We need to resolve the branch reference to determine the
160
# cloning_metadir. This causes unnecessary RPCs to open the
161
# referenced branch (and bzrdir, etc) but only when the caller
162
# didn't already resolve the branch reference.
163
referenced_branch = self.open_branch()
164
return referenced_branch.bzrdir.cloning_metadir()
156
165
if len(response) != 3:
157
166
raise errors.UnexpectedSmartServerResponse(response)
158
167
control_name, repo_name, branch_info = response
268
277
if response[0] == 'ref':
269
278
# a branch reference, use the existing BranchReference logic.
270
279
format = BranchReferenceFormat()
271
return format.open(self, _found=True, location=response[1])
280
return format.open(self, _found=True, location=response[1],
281
ignore_fallbacks=ignore_fallbacks)
272
282
branch_format_name = response[1]
273
283
if not branch_format_name:
274
284
branch_format_name = None
275
285
format = RemoteBranchFormat(network_name=branch_format_name)
276
return RemoteBranch(self, self.find_repository(), format=format)
286
return RemoteBranch(self, self.find_repository(), format=format,
287
setup_stacking=not ignore_fallbacks)
278
289
def _open_repo_v1(self, path):
279
290
verb = 'BzrDir.find_repository'
643
659
"""Ensure that there is a _real_repository set.
645
661
Used before calls to self._real_repository.
663
Note that _ensure_real causes many roundtrips to the server which are
664
not desirable, and prevents the use of smart one-roundtrip RPC's to
665
perform complex operations (such as accessing parent data, streaming
666
revisions etc). Adding calls to _ensure_real should only be done when
667
bringing up new functionality, adding fallbacks for smart methods that
668
require a fallback path, and never to replace an existing smart method
669
invocation. If in doubt chat to the bzr network team.
647
671
if self._real_repository is None:
648
672
self.bzrdir._ensure_real()
677
701
self._ensure_real()
678
702
return self._real_repository._generate_text_key_index()
680
@symbol_versioning.deprecated_method(symbol_versioning.one_four)
681
def get_revision_graph(self, revision_id=None):
682
"""See Repository.get_revision_graph()."""
683
return self._get_revision_graph(revision_id)
685
704
def _get_revision_graph(self, revision_id):
686
705
"""Private method for using with old (< 1.2) servers to fallback."""
687
706
if revision_id is None:
1082
1101
self._ensure_real()
1083
1102
return self._real_repository.make_working_trees()
1104
def refresh_data(self):
1105
"""Re-read any data needed to to synchronise with disk.
1107
This method is intended to be called after another repository instance
1108
(such as one used by a smart server) has inserted data into the
1109
repository. It may not be called during a write group, but may be
1110
called at any other time.
1112
if self.is_in_write_group():
1113
raise errors.InternalBzrError(
1114
"May not refresh_data while in a write group.")
1115
if self._real_repository is not None:
1116
self._real_repository.refresh_data()
1085
1118
def revision_ids_to_search_result(self, result_set):
1086
1119
"""Convert a set of revision ids to a graph SearchResult."""
1087
1120
result_parents = set()
1109
1142
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1110
1143
fetch_spec=None):
1144
# No base implementation to use as RemoteRepository is not a subclass
1145
# of Repository; so this is a copy of Repository.fetch().
1111
1146
if fetch_spec is not None and revision_id is not None:
1112
1147
raise AssertionError(
1113
1148
"fetch_spec and revision_id are mutually exclusive.")
1114
# Not delegated to _real_repository so that InterRepository.get has a
1115
# chance to find an InterRepository specialised for RemoteRepository.
1149
if self.is_in_write_group():
1150
raise errors.InternalBzrError(
1151
"May not fetch while in a write group.")
1152
# fast path same-url fetch operations
1116
1153
if self.has_same_location(source) and fetch_spec is None:
1117
1154
# check that last_revision is in 'from' and then return a
1118
1155
# no-operation.
1120
1157
not revision.is_null(revision_id)):
1121
1158
self.get_revision(revision_id)
1160
# if there is no specific appropriate InterRepository, this will get
1161
# the InterRepository base class, which raises an
1162
# IncompatibleRepositories when asked to fetch.
1123
1163
inter = repository.InterRepository.get(source, self)
1125
return inter.fetch(revision_id=revision_id, pb=pb,
1126
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1127
except NotImplementedError:
1128
raise errors.IncompatibleRepositories(source, self)
1164
return inter.fetch(revision_id=revision_id, pb=pb,
1165
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1130
1167
def create_bundle(self, target, base, fileobj, format=None):
1131
1168
self._ensure_real()
1162
1199
# We already found out that the server can't understand
1163
1200
# Repository.get_parent_map requests, so just fetch the whole
1165
# XXX: Note that this will issue a deprecation warning. This is ok
1166
# :- its because we're working with a deprecated server anyway, and
1167
# the user will almost certainly have seen a warning about the
1168
# server version already.
1169
rg = self.get_revision_graph()
1203
# Note that this reads the whole graph, when only some keys are
1204
# wanted. On this old server there's no way (?) to get them all
1205
# in one go, and the user probably will have seen a warning about
1206
# the server being old anyhow.
1207
rg = self._get_revision_graph(None)
1170
1208
# There is an api discrepency between get_parent_map and
1171
1209
# get_revision_graph. Specifically, a "key:()" pair in
1172
1210
# get_revision_graph just means a node has no parents. For
1203
1241
# TODO: Manage this incrementally to avoid covering the same path
1204
1242
# repeatedly. (The server will have to on each request, but the less
1205
1243
# work done the better).
1245
# Negative caching notes:
1246
# new server sends missing when a request including the revid
1247
# 'include-missing:' is present in the request.
1248
# missing keys are serialised as missing:X, and we then call
1249
# provider.note_missing(X) for-all X
1206
1250
parents_map = self._unstacked_provider.get_cached_map()
1207
1251
if parents_map is None:
1208
1252
# Repository is not locked, so there's no cache.
1209
1253
parents_map = {}
1254
# start_set is all the keys in the cache
1210
1255
start_set = set(parents_map)
1256
# result set is all the references to keys in the cache
1211
1257
result_parents = set()
1212
1258
for parents in parents_map.itervalues():
1213
1259
result_parents.update(parents)
1214
1260
stop_keys = result_parents.difference(start_set)
1261
# We don't need to send ghosts back to the server as a position to
1263
stop_keys.difference_update(self._unstacked_provider.missing_keys)
1215
1264
included_keys = start_set.intersection(result_parents)
1216
1265
start_set.difference_update(included_keys)
1217
recipe = (start_set, stop_keys, len(parents_map))
1266
recipe = ('manual', start_set, stop_keys, len(parents_map))
1218
1267
body = self._serialise_search_recipe(recipe)
1219
1268
path = self.bzrdir._path_for_remote_call(self._client)
1220
1269
for key in keys:
1238
1287
# To avoid having to disconnect repeatedly, we keep track of the
1239
1288
# fact the server doesn't understand remote methods added in 1.2.
1240
1289
medium._remember_remote_is_before((1, 2))
1241
return self.get_revision_graph(None)
1290
# Recurse just once and we should use the fallback code.
1291
return self._get_parent_map_rpc(keys)
1242
1292
response_tuple, response_handler = response
1243
1293
if response_tuple[0] not in ['ok']:
1244
1294
response_handler.cancel_read_body()
1256
1306
revision_graph[d[0]] = d[1:]
1258
# No parents - so give the Graph result (NULL_REVISION,).
1259
revision_graph[d[0]] = (NULL_REVISION,)
1309
if d[0].startswith('missing:'):
1311
self._unstacked_provider.note_missing_key(revid)
1313
# no parents - so give the Graph result
1315
revision_graph[d[0]] = (NULL_REVISION,)
1260
1316
return revision_graph
1262
1318
@needs_read_lock
1265
1321
return self._real_repository.get_signature_text(revision_id)
1267
1323
@needs_read_lock
1268
@symbol_versioning.deprecated_method(symbol_versioning.one_three)
1269
def get_revision_graph_with_ghosts(self, revision_ids=None):
1271
return self._real_repository.get_revision_graph_with_ghosts(
1272
revision_ids=revision_ids)
1275
1324
def get_inventory_xml(self, revision_id):
1276
1325
self._ensure_real()
1277
1326
return self._real_repository.get_inventory_xml(revision_id)
1289
1338
return self._real_repository.all_revision_ids()
1291
1340
@needs_read_lock
1292
def get_deltas_for_revisions(self, revisions):
1341
def get_deltas_for_revisions(self, revisions, specific_fileids=None):
1293
1342
self._ensure_real()
1294
return self._real_repository.get_deltas_for_revisions(revisions)
1343
return self._real_repository.get_deltas_for_revisions(revisions,
1344
specific_fileids=specific_fileids)
1296
1346
@needs_read_lock
1297
def get_revision_delta(self, revision_id):
1347
def get_revision_delta(self, revision_id, specific_fileids=None):
1298
1348
self._ensure_real()
1299
return self._real_repository.get_revision_delta(revision_id)
1349
return self._real_repository.get_revision_delta(revision_id,
1350
specific_fileids=specific_fileids)
1301
1352
@needs_read_lock
1302
1353
def revision_trees(self, revision_ids):
1479
1530
:param recipe: A search recipe (start, stop, count).
1480
1531
:return: Serialised bytes.
1482
start_keys = ' '.join(recipe[0])
1483
stop_keys = ' '.join(recipe[1])
1484
count = str(recipe[2])
1533
start_keys = ' '.join(recipe[1])
1534
stop_keys = ' '.join(recipe[2])
1535
count = str(recipe[3])
1485
1536
return '\n'.join((start_keys, stop_keys, count))
1487
1538
def _serialise_search_result(self, search_result):
1501
1552
self._ensure_real()
1502
1553
self._real_repository._pack_collection.autopack()
1504
if self._real_repository is not None:
1505
# Reset the real repository's cache of pack names.
1506
# XXX: At some point we may be able to skip this and just rely on
1507
# the automatic retry logic to do the right thing, but for now we
1508
# err on the side of being correct rather than being optimal.
1509
self._real_repository._pack_collection.reload_pack_names()
1510
1556
if response[0] != 'ok':
1511
1557
raise errors.UnexpectedSmartServerResponse(response)
1524
1570
def insert_stream(self, stream, src_format, resume_tokens):
1525
repo = self.target_repo
1526
client = repo._client
1571
target = self.target_repo
1572
if target._lock_token:
1573
verb = 'Repository.insert_stream_locked'
1574
extra_args = (target._lock_token or '',)
1575
required_version = (1, 14)
1577
verb = 'Repository.insert_stream'
1579
required_version = (1, 13)
1580
client = target._client
1527
1581
medium = client._medium
1528
if medium._is_remote_before((1, 13)):
1582
if medium._is_remote_before(required_version):
1529
1583
# No possible way this can work.
1530
1584
return self._insert_real(stream, src_format, resume_tokens)
1531
path = repo.bzrdir._path_for_remote_call(client)
1585
path = target.bzrdir._path_for_remote_call(client)
1532
1586
if not resume_tokens:
1533
1587
# XXX: Ugly but important for correctness, *will* be fixed during
1534
1588
# 1.13 cycle. Pushing a stream that is interrupted results in a
1541
1595
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1543
1597
response = client.call_with_body_stream(
1544
('Repository.insert_stream', path, ''), byte_stream)
1598
(verb, path, '') + extra_args, byte_stream)
1545
1599
except errors.UnknownSmartMethod:
1546
medium._remember_remote_is_before((1,13))
1600
medium._remember_remote_is_before(required_version)
1547
1601
return self._insert_real(stream, src_format, resume_tokens)
1548
1602
byte_stream = smart_repo._stream_to_byte_stream(
1549
1603
stream, src_format)
1550
1604
resume_tokens = ' '.join(resume_tokens)
1551
1605
response = client.call_with_body_stream(
1552
('Repository.insert_stream', path, resume_tokens), byte_stream)
1606
(verb, path, resume_tokens) + extra_args, byte_stream)
1553
1607
if response[0][0] not in ('ok', 'missing-basis'):
1554
1608
raise errors.UnexpectedSmartServerResponse(response)
1555
1609
if response[0][0] == 'missing-basis':
1557
1611
resume_tokens = tokens
1558
1612
return resume_tokens, missing_keys
1560
if self.target_repo._real_repository is not None:
1561
collection = getattr(self.target_repo._real_repository,
1562
'_pack_collection', None)
1563
if collection is not None:
1564
collection.reload_pack_names()
1614
self.target_repo.refresh_data()
1565
1615
return [], set()
1569
1619
"""Stream data from a remote server."""
1571
1621
def get_stream(self, search):
1572
# streaming with fallback repositories is not well defined yet: The
1573
# remote repository cannot see the fallback repositories, and thus
1574
# cannot satisfy the entire search in the general case. Likewise the
1575
# fallback repositories cannot reify the search to determine what they
1576
# should send. It likely needs a return value in the stream listing the
1577
# edge of the search to resume from in fallback repositories.
1578
if self.from_repository._fallback_repositories:
1579
return repository.StreamSource.get_stream(self, search)
1580
repo = self.from_repository
1622
if (self.from_repository._fallback_repositories and
1623
self.to_format._fetch_order == 'topological'):
1624
return self._real_stream(self.from_repository, search)
1625
return self.missing_parents_chain(search, [self.from_repository] +
1626
self.from_repository._fallback_repositories)
1628
def _real_stream(self, repo, search):
1629
"""Get a stream for search from repo.
1631
This never called RemoteStreamSource.get_stream, and is a heler
1632
for RemoteStreamSource._get_stream to allow getting a stream
1633
reliably whether fallback back because of old servers or trying
1634
to stream from a non-RemoteRepository (which the stacked support
1637
source = repo._get_source(self.to_format)
1638
if isinstance(source, RemoteStreamSource):
1639
return repository.StreamSource.get_stream(source, search)
1640
return source.get_stream(search)
1642
def _get_stream(self, repo, search):
1643
"""Core worker to get a stream from repo for search.
1645
This is used by both get_stream and the stacking support logic. It
1646
deliberately gets a stream for repo which does not need to be
1647
self.from_repository. In the event that repo is not Remote, or
1648
cannot do a smart stream, a fallback is made to the generic
1649
repository._get_stream() interface, via self._real_stream.
1651
In the event of stacking, streams from _get_stream will not
1652
contain all the data for search - this is normal (see get_stream).
1654
:param repo: A repository.
1655
:param search: A search.
1657
# Fallbacks may be non-smart
1658
if not isinstance(repo, RemoteRepository):
1659
return self._real_stream(repo, search)
1581
1660
client = repo._client
1582
1661
medium = client._medium
1583
1662
if medium._is_remote_before((1, 13)):
1584
# No possible way this can work.
1585
return repository.StreamSource.get_stream(self, search)
1663
# streaming was added in 1.13
1664
return self._real_stream(repo, search)
1586
1665
path = repo.bzrdir._path_for_remote_call(client)
1588
1667
search_bytes = repo._serialise_search_result(search)
1592
1671
response_tuple, response_handler = response
1593
1672
except errors.UnknownSmartMethod:
1594
1673
medium._remember_remote_is_before((1,13))
1595
return repository.StreamSource.get_stream(self, search)
1674
return self._real_stream(repo, search)
1596
1675
if response_tuple[0] != 'ok':
1597
1676
raise errors.UnexpectedSmartServerResponse(response_tuple)
1598
1677
byte_stream = response_handler.read_streamed_body()
1603
1682
src_format.network_name(), repo._format.network_name()))
1685
def missing_parents_chain(self, search, sources):
1686
"""Chain multiple streams together to handle stacking.
1688
:param search: The overall search to satisfy with streams.
1689
:param sources: A list of Repository objects to query.
1691
self.serialiser = self.to_format._serializer
1692
self.seen_revs = set()
1693
self.referenced_revs = set()
1694
# If there are heads in the search, or the key count is > 0, we are not
1696
while not search.is_empty() and len(sources) > 1:
1697
source = sources.pop(0)
1698
stream = self._get_stream(source, search)
1699
for kind, substream in stream:
1700
if kind != 'revisions':
1701
yield kind, substream
1703
yield kind, self.missing_parents_rev_handler(substream)
1704
search = search.refine(self.seen_revs, self.referenced_revs)
1705
self.seen_revs = set()
1706
self.referenced_revs = set()
1707
if not search.is_empty():
1708
for kind, stream in self._get_stream(sources[0], search):
1711
def missing_parents_rev_handler(self, substream):
1712
for content in substream:
1713
revision_bytes = content.get_bytes_as('fulltext')
1714
revision = self.serialiser.read_revision_from_string(revision_bytes)
1715
self.seen_revs.add(content.key[-1])
1716
self.referenced_revs.update(revision.parent_ids)
1607
1720
class RemoteBranchLockableFiles(LockableFiles):
1608
1721
"""A 'LockableFiles' implementation that talks to a smart server.
1648
1761
def network_name(self):
1649
1762
return self._network_name
1651
def open(self, a_bzrdir):
1652
return a_bzrdir.open_branch()
1764
def open(self, a_bzrdir, ignore_fallbacks=False):
1765
return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
1654
1767
def _vfs_initialize(self, a_bzrdir):
1655
1768
# Initialisation when using a local bzrdir object, or a non-vfs init