/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

MergeĀ upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
153
153
        except errors.UnknownSmartMethod:
154
154
            medium._remember_remote_is_before((1, 13))
155
155
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
156
        except errors.UnknownErrorFromSmartServer, err:
 
157
            if err.error_tuple != ('BranchReference',):
 
158
                raise
 
159
            # We need to resolve the branch reference to determine the
 
160
            # cloning_metadir.  This causes unnecessary RPCs to open the
 
161
            # referenced branch (and bzrdir, etc) but only when the caller
 
162
            # didn't already resolve the branch reference.
 
163
            referenced_branch = self.open_branch()
 
164
            return referenced_branch.bzrdir.cloning_metadir()
156
165
        if len(response) != 3:
157
166
            raise errors.UnexpectedSmartServerResponse(response)
158
167
        control_name, repo_name, branch_info = response
256
265
        """See BzrDir._get_tree_branch()."""
257
266
        return None, self.open_branch()
258
267
 
259
 
    def open_branch(self, _unsupported=False):
 
268
    def open_branch(self, _unsupported=False, ignore_fallbacks=False):
260
269
        if _unsupported:
261
270
            raise NotImplementedError('unsupported flag support not implemented yet.')
262
271
        if self._next_open_branch_result is not None:
268
277
        if response[0] == 'ref':
269
278
            # a branch reference, use the existing BranchReference logic.
270
279
            format = BranchReferenceFormat()
271
 
            return format.open(self, _found=True, location=response[1])
 
280
            return format.open(self, _found=True, location=response[1],
 
281
                ignore_fallbacks=ignore_fallbacks)
272
282
        branch_format_name = response[1]
273
283
        if not branch_format_name:
274
284
            branch_format_name = None
275
285
        format = RemoteBranchFormat(network_name=branch_format_name)
276
 
        return RemoteBranch(self, self.find_repository(), format=format)
 
286
        return RemoteBranch(self, self.find_repository(), format=format,
 
287
            setup_stacking=not ignore_fallbacks)
277
288
 
278
289
    def _open_repo_v1(self, path):
279
290
        verb = 'BzrDir.find_repository'
417
428
        self._rich_root_data = None
418
429
 
419
430
    @property
 
431
    def fast_deltas(self):
 
432
        self._ensure_real()
 
433
        return self._custom_format.fast_deltas
 
434
 
 
435
    @property
420
436
    def rich_root_data(self):
421
437
        if self._rich_root_data is None:
422
438
            self._ensure_real()
428
444
        if self._supports_external_lookups is None:
429
445
            self._ensure_real()
430
446
            self._supports_external_lookups = \
431
 
                self._custom_format.supports_external_lookups 
 
447
                self._custom_format.supports_external_lookups
432
448
        return self._supports_external_lookups
433
449
 
434
450
    @property
643
659
        """Ensure that there is a _real_repository set.
644
660
 
645
661
        Used before calls to self._real_repository.
 
662
 
 
663
        Note that _ensure_real causes many roundtrips to the server which are
 
664
        not desirable, and prevents the use of smart one-roundtrip RPC's to
 
665
        perform complex operations (such as accessing parent data, streaming
 
666
        revisions etc). Adding calls to _ensure_real should only be done when
 
667
        bringing up new functionality, adding fallbacks for smart methods that
 
668
        require a fallback path, and never to replace an existing smart method
 
669
        invocation. If in doubt chat to the bzr network team.
646
670
        """
647
671
        if self._real_repository is None:
648
672
            self.bzrdir._ensure_real()
677
701
        self._ensure_real()
678
702
        return self._real_repository._generate_text_key_index()
679
703
 
680
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_four)
681
 
    def get_revision_graph(self, revision_id=None):
682
 
        """See Repository.get_revision_graph()."""
683
 
        return self._get_revision_graph(revision_id)
684
 
 
685
704
    def _get_revision_graph(self, revision_id):
686
705
        """Private method for using with old (< 1.2) servers to fallback."""
687
706
        if revision_id is None:
820
839
        if not self._lock_mode:
821
840
            self._lock_mode = 'r'
822
841
            self._lock_count = 1
823
 
            self._unstacked_provider.enable_cache(cache_misses=False)
 
842
            self._unstacked_provider.enable_cache(cache_misses=True)
824
843
            if self._real_repository is not None:
825
844
                self._real_repository.lock_read()
826
845
        else:
1082
1101
        self._ensure_real()
1083
1102
        return self._real_repository.make_working_trees()
1084
1103
 
 
1104
    def refresh_data(self):
 
1105
        """Re-read any data needed to to synchronise with disk.
 
1106
 
 
1107
        This method is intended to be called after another repository instance
 
1108
        (such as one used by a smart server) has inserted data into the
 
1109
        repository. It may not be called during a write group, but may be
 
1110
        called at any other time.
 
1111
        """
 
1112
        if self.is_in_write_group():
 
1113
            raise errors.InternalBzrError(
 
1114
                "May not refresh_data while in a write group.")
 
1115
        if self._real_repository is not None:
 
1116
            self._real_repository.refresh_data()
 
1117
 
1085
1118
    def revision_ids_to_search_result(self, result_set):
1086
1119
        """Convert a set of revision ids to a graph SearchResult."""
1087
1120
        result_parents = set()
1108
1141
 
1109
1142
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1110
1143
            fetch_spec=None):
 
1144
        # No base implementation to use as RemoteRepository is not a subclass
 
1145
        # of Repository; so this is a copy of Repository.fetch().
1111
1146
        if fetch_spec is not None and revision_id is not None:
1112
1147
            raise AssertionError(
1113
1148
                "fetch_spec and revision_id are mutually exclusive.")
1114
 
        # Not delegated to _real_repository so that InterRepository.get has a
1115
 
        # chance to find an InterRepository specialised for RemoteRepository.
 
1149
        if self.is_in_write_group():
 
1150
            raise errors.InternalBzrError(
 
1151
                "May not fetch while in a write group.")
 
1152
        # fast path same-url fetch operations
1116
1153
        if self.has_same_location(source) and fetch_spec is None:
1117
1154
            # check that last_revision is in 'from' and then return a
1118
1155
            # no-operation.
1120
1157
                not revision.is_null(revision_id)):
1121
1158
                self.get_revision(revision_id)
1122
1159
            return 0, []
 
1160
        # if there is no specific appropriate InterRepository, this will get
 
1161
        # the InterRepository base class, which raises an
 
1162
        # IncompatibleRepositories when asked to fetch.
1123
1163
        inter = repository.InterRepository.get(source, self)
1124
 
        try:
1125
 
            return inter.fetch(revision_id=revision_id, pb=pb,
1126
 
                    find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1127
 
        except NotImplementedError:
1128
 
            raise errors.IncompatibleRepositories(source, self)
 
1164
        return inter.fetch(revision_id=revision_id, pb=pb,
 
1165
            find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1129
1166
 
1130
1167
    def create_bundle(self, target, base, fileobj, format=None):
1131
1168
        self._ensure_real()
1162
1199
            # We already found out that the server can't understand
1163
1200
            # Repository.get_parent_map requests, so just fetch the whole
1164
1201
            # graph.
1165
 
            # XXX: Note that this will issue a deprecation warning. This is ok
1166
 
            # :- its because we're working with a deprecated server anyway, and
1167
 
            # the user will almost certainly have seen a warning about the
1168
 
            # server version already.
1169
 
            rg = self.get_revision_graph()
 
1202
            #
 
1203
            # Note that this reads the whole graph, when only some keys are
 
1204
            # wanted.  On this old server there's no way (?) to get them all
 
1205
            # in one go, and the user probably will have seen a warning about
 
1206
            # the server being old anyhow.
 
1207
            rg = self._get_revision_graph(None)
1170
1208
            # There is an api discrepency between get_parent_map and
1171
1209
            # get_revision_graph. Specifically, a "key:()" pair in
1172
1210
            # get_revision_graph just means a node has no parents. For
1203
1241
        # TODO: Manage this incrementally to avoid covering the same path
1204
1242
        # repeatedly. (The server will have to on each request, but the less
1205
1243
        # work done the better).
 
1244
        #
 
1245
        # Negative caching notes:
 
1246
        # new server sends missing when a request including the revid
 
1247
        # 'include-missing:' is present in the request.
 
1248
        # missing keys are serialised as missing:X, and we then call
 
1249
        # provider.note_missing(X) for-all X
1206
1250
        parents_map = self._unstacked_provider.get_cached_map()
1207
1251
        if parents_map is None:
1208
1252
            # Repository is not locked, so there's no cache.
1209
1253
            parents_map = {}
 
1254
        # start_set is all the keys in the cache
1210
1255
        start_set = set(parents_map)
 
1256
        # result set is all the references to keys in the cache
1211
1257
        result_parents = set()
1212
1258
        for parents in parents_map.itervalues():
1213
1259
            result_parents.update(parents)
1214
1260
        stop_keys = result_parents.difference(start_set)
 
1261
        # We don't need to send ghosts back to the server as a position to
 
1262
        # stop either.
 
1263
        stop_keys.difference_update(self._unstacked_provider.missing_keys)
1215
1264
        included_keys = start_set.intersection(result_parents)
1216
1265
        start_set.difference_update(included_keys)
1217
 
        recipe = (start_set, stop_keys, len(parents_map))
 
1266
        recipe = ('manual', start_set, stop_keys, len(parents_map))
1218
1267
        body = self._serialise_search_recipe(recipe)
1219
1268
        path = self.bzrdir._path_for_remote_call(self._client)
1220
1269
        for key in keys:
1222
1271
                raise ValueError(
1223
1272
                    "key %r not a plain string" % (key,))
1224
1273
        verb = 'Repository.get_parent_map'
1225
 
        args = (path,) + tuple(keys)
 
1274
        args = (path, 'include-missing:') + tuple(keys)
1226
1275
        try:
1227
1276
            response = self._call_with_body_bytes_expecting_body(
1228
1277
                verb, args, body)
1238
1287
            # To avoid having to disconnect repeatedly, we keep track of the
1239
1288
            # fact the server doesn't understand remote methods added in 1.2.
1240
1289
            medium._remember_remote_is_before((1, 2))
1241
 
            return self.get_revision_graph(None)
 
1290
            # Recurse just once and we should use the fallback code.
 
1291
            return self._get_parent_map_rpc(keys)
1242
1292
        response_tuple, response_handler = response
1243
1293
        if response_tuple[0] not in ['ok']:
1244
1294
            response_handler.cancel_read_body()
1255
1305
                if len(d) > 1:
1256
1306
                    revision_graph[d[0]] = d[1:]
1257
1307
                else:
1258
 
                    # No parents - so give the Graph result (NULL_REVISION,).
1259
 
                    revision_graph[d[0]] = (NULL_REVISION,)
 
1308
                    # No parents:
 
1309
                    if d[0].startswith('missing:'):
 
1310
                        revid = d[0][8:]
 
1311
                        self._unstacked_provider.note_missing_key(revid)
 
1312
                    else:
 
1313
                        # no parents - so give the Graph result
 
1314
                        # (NULL_REVISION,).
 
1315
                        revision_graph[d[0]] = (NULL_REVISION,)
1260
1316
            return revision_graph
1261
1317
 
1262
1318
    @needs_read_lock
1265
1321
        return self._real_repository.get_signature_text(revision_id)
1266
1322
 
1267
1323
    @needs_read_lock
1268
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_three)
1269
 
    def get_revision_graph_with_ghosts(self, revision_ids=None):
1270
 
        self._ensure_real()
1271
 
        return self._real_repository.get_revision_graph_with_ghosts(
1272
 
            revision_ids=revision_ids)
1273
 
 
1274
 
    @needs_read_lock
1275
1324
    def get_inventory_xml(self, revision_id):
1276
1325
        self._ensure_real()
1277
1326
        return self._real_repository.get_inventory_xml(revision_id)
1289
1338
        return self._real_repository.all_revision_ids()
1290
1339
 
1291
1340
    @needs_read_lock
1292
 
    def get_deltas_for_revisions(self, revisions):
 
1341
    def get_deltas_for_revisions(self, revisions, specific_fileids=None):
1293
1342
        self._ensure_real()
1294
 
        return self._real_repository.get_deltas_for_revisions(revisions)
 
1343
        return self._real_repository.get_deltas_for_revisions(revisions,
 
1344
            specific_fileids=specific_fileids)
1295
1345
 
1296
1346
    @needs_read_lock
1297
 
    def get_revision_delta(self, revision_id):
 
1347
    def get_revision_delta(self, revision_id, specific_fileids=None):
1298
1348
        self._ensure_real()
1299
 
        return self._real_repository.get_revision_delta(revision_id)
 
1349
        return self._real_repository.get_revision_delta(revision_id,
 
1350
            specific_fileids=specific_fileids)
1300
1351
 
1301
1352
    @needs_read_lock
1302
1353
    def revision_trees(self, revision_ids):
1479
1530
        :param recipe: A search recipe (start, stop, count).
1480
1531
        :return: Serialised bytes.
1481
1532
        """
1482
 
        start_keys = ' '.join(recipe[0])
1483
 
        stop_keys = ' '.join(recipe[1])
1484
 
        count = str(recipe[2])
 
1533
        start_keys = ' '.join(recipe[1])
 
1534
        stop_keys = ' '.join(recipe[2])
 
1535
        count = str(recipe[3])
1485
1536
        return '\n'.join((start_keys, stop_keys, count))
1486
1537
 
1487
1538
    def _serialise_search_result(self, search_result):
1490
1541
            parts.extend(search_result.heads)
1491
1542
        else:
1492
1543
            recipe = search_result.get_recipe()
1493
 
            parts = ['search', self._serialise_search_recipe(recipe)]
 
1544
            parts = [recipe[0], self._serialise_search_recipe(recipe)]
1494
1545
        return '\n'.join(parts)
1495
1546
 
1496
1547
    def autopack(self):
1501
1552
            self._ensure_real()
1502
1553
            self._real_repository._pack_collection.autopack()
1503
1554
            return
1504
 
        if self._real_repository is not None:
1505
 
            # Reset the real repository's cache of pack names.
1506
 
            # XXX: At some point we may be able to skip this and just rely on
1507
 
            # the automatic retry logic to do the right thing, but for now we
1508
 
            # err on the side of being correct rather than being optimal.
1509
 
            self._real_repository._pack_collection.reload_pack_names()
 
1555
        self.refresh_data()
1510
1556
        if response[0] != 'ok':
1511
1557
            raise errors.UnexpectedSmartServerResponse(response)
1512
1558
 
1522
1568
        return result
1523
1569
 
1524
1570
    def insert_stream(self, stream, src_format, resume_tokens):
1525
 
        repo = self.target_repo
1526
 
        client = repo._client
 
1571
        target = self.target_repo
 
1572
        if target._lock_token:
 
1573
            verb = 'Repository.insert_stream_locked'
 
1574
            extra_args = (target._lock_token or '',)
 
1575
            required_version = (1, 14)
 
1576
        else:
 
1577
            verb = 'Repository.insert_stream'
 
1578
            extra_args = ()
 
1579
            required_version = (1, 13)
 
1580
        client = target._client
1527
1581
        medium = client._medium
1528
 
        if medium._is_remote_before((1, 13)):
 
1582
        if medium._is_remote_before(required_version):
1529
1583
            # No possible way this can work.
1530
1584
            return self._insert_real(stream, src_format, resume_tokens)
1531
 
        path = repo.bzrdir._path_for_remote_call(client)
 
1585
        path = target.bzrdir._path_for_remote_call(client)
1532
1586
        if not resume_tokens:
1533
1587
            # XXX: Ugly but important for correctness, *will* be fixed during
1534
1588
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1541
1595
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1542
1596
            try:
1543
1597
                response = client.call_with_body_stream(
1544
 
                    ('Repository.insert_stream', path, ''), byte_stream)
 
1598
                    (verb, path, '') + extra_args, byte_stream)
1545
1599
            except errors.UnknownSmartMethod:
1546
 
                medium._remember_remote_is_before((1,13))
 
1600
                medium._remember_remote_is_before(required_version)
1547
1601
                return self._insert_real(stream, src_format, resume_tokens)
1548
1602
        byte_stream = smart_repo._stream_to_byte_stream(
1549
1603
            stream, src_format)
1550
1604
        resume_tokens = ' '.join(resume_tokens)
1551
1605
        response = client.call_with_body_stream(
1552
 
            ('Repository.insert_stream', path, resume_tokens), byte_stream)
 
1606
            (verb, path, resume_tokens) + extra_args, byte_stream)
1553
1607
        if response[0][0] not in ('ok', 'missing-basis'):
1554
1608
            raise errors.UnexpectedSmartServerResponse(response)
1555
1609
        if response[0][0] == 'missing-basis':
1557
1611
            resume_tokens = tokens
1558
1612
            return resume_tokens, missing_keys
1559
1613
        else:
1560
 
            if self.target_repo._real_repository is not None:
1561
 
                collection = getattr(self.target_repo._real_repository,
1562
 
                    '_pack_collection', None)
1563
 
                if collection is not None:
1564
 
                    collection.reload_pack_names()
 
1614
            self.target_repo.refresh_data()
1565
1615
            return [], set()
1566
1616
 
1567
1617
 
1569
1619
    """Stream data from a remote server."""
1570
1620
 
1571
1621
    def get_stream(self, search):
1572
 
        # streaming with fallback repositories is not well defined yet: The
1573
 
        # remote repository cannot see the fallback repositories, and thus
1574
 
        # cannot satisfy the entire search in the general case. Likewise the
1575
 
        # fallback repositories cannot reify the search to determine what they
1576
 
        # should send. It likely needs a return value in the stream listing the
1577
 
        # edge of the search to resume from in fallback repositories.
1578
 
        if self.from_repository._fallback_repositories:
1579
 
            return repository.StreamSource.get_stream(self, search)
1580
 
        repo = self.from_repository
 
1622
        if (self.from_repository._fallback_repositories and
 
1623
            self.to_format._fetch_order == 'topological'):
 
1624
            return self._real_stream(self.from_repository, search)
 
1625
        return self.missing_parents_chain(search, [self.from_repository] +
 
1626
            self.from_repository._fallback_repositories)
 
1627
 
 
1628
    def _real_stream(self, repo, search):
 
1629
        """Get a stream for search from repo.
 
1630
        
 
1631
        This never called RemoteStreamSource.get_stream, and is a heler
 
1632
        for RemoteStreamSource._get_stream to allow getting a stream 
 
1633
        reliably whether fallback back because of old servers or trying
 
1634
        to stream from a non-RemoteRepository (which the stacked support
 
1635
        code will do).
 
1636
        """
 
1637
        source = repo._get_source(self.to_format)
 
1638
        if isinstance(source, RemoteStreamSource):
 
1639
            return repository.StreamSource.get_stream(source, search)
 
1640
        return source.get_stream(search)
 
1641
 
 
1642
    def _get_stream(self, repo, search):
 
1643
        """Core worker to get a stream from repo for search.
 
1644
 
 
1645
        This is used by both get_stream and the stacking support logic. It
 
1646
        deliberately gets a stream for repo which does not need to be
 
1647
        self.from_repository. In the event that repo is not Remote, or
 
1648
        cannot do a smart stream, a fallback is made to the generic
 
1649
        repository._get_stream() interface, via self._real_stream.
 
1650
 
 
1651
        In the event of stacking, streams from _get_stream will not
 
1652
        contain all the data for search - this is normal (see get_stream).
 
1653
 
 
1654
        :param repo: A repository.
 
1655
        :param search: A search.
 
1656
        """
 
1657
        # Fallbacks may be non-smart
 
1658
        if not isinstance(repo, RemoteRepository):
 
1659
            return self._real_stream(repo, search)
1581
1660
        client = repo._client
1582
1661
        medium = client._medium
1583
1662
        if medium._is_remote_before((1, 13)):
1584
 
            # No possible way this can work.
1585
 
            return repository.StreamSource.get_stream(self, search)
 
1663
            # streaming was added in 1.13
 
1664
            return self._real_stream(repo, search)
1586
1665
        path = repo.bzrdir._path_for_remote_call(client)
1587
1666
        try:
1588
1667
            search_bytes = repo._serialise_search_result(search)
1592
1671
            response_tuple, response_handler = response
1593
1672
        except errors.UnknownSmartMethod:
1594
1673
            medium._remember_remote_is_before((1,13))
1595
 
            return repository.StreamSource.get_stream(self, search)
 
1674
            return self._real_stream(repo, search)
1596
1675
        if response_tuple[0] != 'ok':
1597
1676
            raise errors.UnexpectedSmartServerResponse(response_tuple)
1598
1677
        byte_stream = response_handler.read_streamed_body()
1603
1682
                src_format.network_name(), repo._format.network_name()))
1604
1683
        return stream
1605
1684
 
 
1685
    def missing_parents_chain(self, search, sources):
 
1686
        """Chain multiple streams together to handle stacking.
 
1687
 
 
1688
        :param search: The overall search to satisfy with streams.
 
1689
        :param sources: A list of Repository objects to query.
 
1690
        """
 
1691
        self.serialiser = self.to_format._serializer
 
1692
        self.seen_revs = set()
 
1693
        self.referenced_revs = set()
 
1694
        # If there are heads in the search, or the key count is > 0, we are not
 
1695
        # done.
 
1696
        while not search.is_empty() and len(sources) > 1:
 
1697
            source = sources.pop(0)
 
1698
            stream = self._get_stream(source, search)
 
1699
            for kind, substream in stream:
 
1700
                if kind != 'revisions':
 
1701
                    yield kind, substream
 
1702
                else:
 
1703
                    yield kind, self.missing_parents_rev_handler(substream)
 
1704
            search = search.refine(self.seen_revs, self.referenced_revs)
 
1705
            self.seen_revs = set()
 
1706
            self.referenced_revs = set()
 
1707
        if not search.is_empty():
 
1708
            for kind, stream in self._get_stream(sources[0], search):
 
1709
                yield kind, stream
 
1710
 
 
1711
    def missing_parents_rev_handler(self, substream):
 
1712
        for content in substream:
 
1713
            revision_bytes = content.get_bytes_as('fulltext')
 
1714
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
1715
            self.seen_revs.add(content.key[-1])
 
1716
            self.referenced_revs.update(revision.parent_ids)
 
1717
            yield content
 
1718
 
1606
1719
 
1607
1720
class RemoteBranchLockableFiles(LockableFiles):
1608
1721
    """A 'LockableFiles' implementation that talks to a smart server.
1648
1761
    def network_name(self):
1649
1762
        return self._network_name
1650
1763
 
1651
 
    def open(self, a_bzrdir):
1652
 
        return a_bzrdir.open_branch()
 
1764
    def open(self, a_bzrdir, ignore_fallbacks=False):
 
1765
        return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
1653
1766
 
1654
1767
    def _vfs_initialize(self, a_bzrdir):
1655
1768
        # Initialisation when using a local bzrdir object, or a non-vfs init