/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

merge bzr.dev r4164

Show diffs side-by-side

added added

removed removed

Lines of Context:
73
73
def response_tuple_to_repo_format(response):
74
74
    """Convert a response tuple describing a repository format to a format."""
75
75
    format = RemoteRepositoryFormat()
76
 
    format.rich_root_data = (response[0] == 'yes')
77
 
    format.supports_tree_reference = (response[1] == 'yes')
78
 
    format.supports_external_lookups = (response[2] == 'yes')
 
76
    format._rich_root_data = (response[0] == 'yes')
 
77
    format._supports_tree_reference = (response[1] == 'yes')
 
78
    format._supports_external_lookups = (response[2] == 'yes')
79
79
    format._network_name = response[3]
80
80
    return format
81
81
 
412
412
        self._custom_format = None
413
413
        self._network_name = None
414
414
        self._creating_bzrdir = None
 
415
        self._supports_external_lookups = None
 
416
        self._supports_tree_reference = None
 
417
        self._rich_root_data = None
 
418
 
 
419
    @property
 
420
    def rich_root_data(self):
 
421
        if self._rich_root_data is None:
 
422
            self._ensure_real()
 
423
            self._rich_root_data = self._custom_format.rich_root_data
 
424
        return self._rich_root_data
 
425
 
 
426
    @property
 
427
    def supports_external_lookups(self):
 
428
        if self._supports_external_lookups is None:
 
429
            self._ensure_real()
 
430
            self._supports_external_lookups = \
 
431
                self._custom_format.supports_external_lookups
 
432
        return self._supports_external_lookups
 
433
 
 
434
    @property
 
435
    def supports_tree_reference(self):
 
436
        if self._supports_tree_reference is None:
 
437
            self._ensure_real()
 
438
            self._supports_tree_reference = \
 
439
                self._custom_format.supports_tree_reference
 
440
        return self._supports_tree_reference
415
441
 
416
442
    def _vfs_initialize(self, a_bzrdir, shared):
417
443
        """Helper for common code in initialize."""
617
643
        """Ensure that there is a _real_repository set.
618
644
 
619
645
        Used before calls to self._real_repository.
 
646
 
 
647
        Note that _ensure_real causes many roundtrips to the server which are
 
648
        not desirable, and prevents the use of smart one-roundtrip RPC's to
 
649
        perform complex operations (such as accessing parent data, streaming
 
650
        revisions etc). Adding calls to _ensure_real should only be done when
 
651
        bringing up new functionality, adding fallbacks for smart methods that
 
652
        require a fallback path, and never to replace an existing smart method
 
653
        invocation. If in doubt chat to the bzr network team.
620
654
        """
621
655
        if self._real_repository is None:
622
656
            self.bzrdir._ensure_real()
995
1029
 
996
1030
        :param repository: A repository.
997
1031
        """
998
 
        # XXX: At the moment the RemoteRepository will allow fallbacks
999
 
        # unconditionally - however, a _real_repository will usually exist,
1000
 
        # and may raise an error if it's not accommodated by the underlying
1001
 
        # format.  Eventually we should check when opening the repository
1002
 
        # whether it's willing to allow them or not.
1003
 
        #
 
1032
        if not self._format.supports_external_lookups:
 
1033
            raise errors.UnstackableRepositoryFormat(
 
1034
                self._format.network_name(), self.base)
1004
1035
        # We need to accumulate additional repositories here, to pass them in
1005
1036
        # on various RPC's.
1006
1037
        #
1059
1090
        self._ensure_real()
1060
1091
        return self._real_repository.make_working_trees()
1061
1092
 
 
1093
    def refresh_data(self):
 
1094
        """Re-read any data needed to to synchronise with disk.
 
1095
 
 
1096
        This method is intended to be called after another repository instance
 
1097
        (such as one used by a smart server) has inserted data into the
 
1098
        repository. It may not be called during a write group, but may be
 
1099
        called at any other time.
 
1100
        """
 
1101
        if self.is_in_write_group():
 
1102
            raise errors.InternalBzrError(
 
1103
                "May not refresh_data while in a write group.")
 
1104
        if self._real_repository is not None:
 
1105
            self._real_repository.refresh_data()
 
1106
 
1062
1107
    def revision_ids_to_search_result(self, result_set):
1063
1108
        """Convert a set of revision ids to a graph SearchResult."""
1064
1109
        result_parents = set()
1085
1130
 
1086
1131
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1087
1132
            fetch_spec=None):
 
1133
        # No base implementation to use as RemoteRepository is not a subclass
 
1134
        # of Repository; so this is a copy of Repository.fetch().
1088
1135
        if fetch_spec is not None and revision_id is not None:
1089
1136
            raise AssertionError(
1090
1137
                "fetch_spec and revision_id are mutually exclusive.")
1091
 
        # Not delegated to _real_repository so that InterRepository.get has a
1092
 
        # chance to find an InterRepository specialised for RemoteRepository.
 
1138
        if self.is_in_write_group():
 
1139
            raise errors.InternalBzrError(
 
1140
                "May not fetch while in a write group.")
 
1141
        # fast path same-url fetch operations
1093
1142
        if self.has_same_location(source) and fetch_spec is None:
1094
1143
            # check that last_revision is in 'from' and then return a
1095
1144
            # no-operation.
1097
1146
                not revision.is_null(revision_id)):
1098
1147
                self.get_revision(revision_id)
1099
1148
            return 0, []
 
1149
        # if there is no specific appropriate InterRepository, this will get
 
1150
        # the InterRepository base class, which raises an
 
1151
        # IncompatibleRepositories when asked to fetch.
1100
1152
        inter = repository.InterRepository.get(source, self)
1101
 
        try:
1102
 
            return inter.fetch(revision_id=revision_id, pb=pb,
1103
 
                    find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1104
 
        except NotImplementedError:
1105
 
            raise errors.IncompatibleRepositories(source, self)
 
1153
        return inter.fetch(revision_id=revision_id, pb=pb,
 
1154
            find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1106
1155
 
1107
1156
    def create_bundle(self, target, base, fileobj, format=None):
1108
1157
        self._ensure_real()
1191
1240
        stop_keys = result_parents.difference(start_set)
1192
1241
        included_keys = start_set.intersection(result_parents)
1193
1242
        start_set.difference_update(included_keys)
1194
 
        recipe = (start_set, stop_keys, len(parents_map))
 
1243
        recipe = ('manual', start_set, stop_keys, len(parents_map))
1195
1244
        body = self._serialise_search_recipe(recipe)
1196
1245
        path = self.bzrdir._path_for_remote_call(self._client)
1197
1246
        for key in keys:
1456
1505
        :param recipe: A search recipe (start, stop, count).
1457
1506
        :return: Serialised bytes.
1458
1507
        """
1459
 
        start_keys = ' '.join(recipe[0])
1460
 
        stop_keys = ' '.join(recipe[1])
1461
 
        count = str(recipe[2])
 
1508
        start_keys = ' '.join(recipe[1])
 
1509
        stop_keys = ' '.join(recipe[2])
 
1510
        count = str(recipe[3])
1462
1511
        return '\n'.join((start_keys, stop_keys, count))
1463
1512
 
1464
1513
    def _serialise_search_result(self, search_result):
1467
1516
            parts.extend(search_result.heads)
1468
1517
        else:
1469
1518
            recipe = search_result.get_recipe()
1470
 
            parts = ['search', self._serialise_search_recipe(recipe)]
 
1519
            parts = [recipe[0], self._serialise_search_recipe(recipe)]
1471
1520
        return '\n'.join(parts)
1472
1521
 
1473
1522
    def autopack(self):
1478
1527
            self._ensure_real()
1479
1528
            self._real_repository._pack_collection.autopack()
1480
1529
            return
1481
 
        if self._real_repository is not None:
1482
 
            # Reset the real repository's cache of pack names.
1483
 
            # XXX: At some point we may be able to skip this and just rely on
1484
 
            # the automatic retry logic to do the right thing, but for now we
1485
 
            # err on the side of being correct rather than being optimal.
1486
 
            self._real_repository._pack_collection.reload_pack_names()
 
1530
        self.refresh_data()
1487
1531
        if response[0] != 'ok':
1488
1532
            raise errors.UnexpectedSmartServerResponse(response)
1489
1533
 
1499
1543
        return result
1500
1544
 
1501
1545
    def insert_stream(self, stream, src_format, resume_tokens):
1502
 
        repo = self.target_repo
1503
 
        client = repo._client
 
1546
        target = self.target_repo
 
1547
        if target._lock_token:
 
1548
            verb = 'Repository.insert_stream_locked'
 
1549
            extra_args = (target._lock_token or '',)
 
1550
            required_version = (1, 14)
 
1551
        else:
 
1552
            verb = 'Repository.insert_stream'
 
1553
            extra_args = ()
 
1554
            required_version = (1, 13)
 
1555
        client = target._client
1504
1556
        medium = client._medium
1505
 
        if medium._is_remote_before((1, 13)):
 
1557
        if medium._is_remote_before(required_version):
1506
1558
            # No possible way this can work.
1507
1559
            return self._insert_real(stream, src_format, resume_tokens)
1508
 
        path = repo.bzrdir._path_for_remote_call(client)
 
1560
        path = target.bzrdir._path_for_remote_call(client)
1509
1561
        if not resume_tokens:
1510
1562
            # XXX: Ugly but important for correctness, *will* be fixed during
1511
1563
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1518
1570
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1519
1571
            try:
1520
1572
                response = client.call_with_body_stream(
1521
 
                    ('Repository.insert_stream', path, ''), byte_stream)
 
1573
                    (verb, path, '') + extra_args, byte_stream)
1522
1574
            except errors.UnknownSmartMethod:
1523
 
                medium._remember_remote_is_before((1,13))
 
1575
                medium._remember_remote_is_before(required_version)
1524
1576
                return self._insert_real(stream, src_format, resume_tokens)
1525
1577
        byte_stream = smart_repo._stream_to_byte_stream(
1526
1578
            stream, src_format)
1527
1579
        resume_tokens = ' '.join(resume_tokens)
1528
1580
        response = client.call_with_body_stream(
1529
 
            ('Repository.insert_stream', path, resume_tokens), byte_stream)
 
1581
            (verb, path, resume_tokens) + extra_args, byte_stream)
1530
1582
        if response[0][0] not in ('ok', 'missing-basis'):
1531
1583
            raise errors.UnexpectedSmartServerResponse(response)
1532
1584
        if response[0][0] == 'missing-basis':
1534
1586
            resume_tokens = tokens
1535
1587
            return resume_tokens, missing_keys
1536
1588
        else:
1537
 
            if self.target_repo._real_repository is not None:
1538
 
                collection = getattr(self.target_repo._real_repository,
1539
 
                    '_pack_collection', None)
1540
 
                if collection is not None:
1541
 
                    collection.reload_pack_names()
 
1589
            self.target_repo.refresh_data()
1542
1590
            return [], set()
1543
1591
 
1544
1592
 
1546
1594
    """Stream data from a remote server."""
1547
1595
 
1548
1596
    def get_stream(self, search):
1549
 
        # streaming with fallback repositories is not well defined yet: The
1550
 
        # remote repository cannot see the fallback repositories, and thus
1551
 
        # cannot satisfy the entire search in the general case. Likewise the
1552
 
        # fallback repositories cannot reify the search to determine what they
1553
 
        # should send. It likely needs a return value in the stream listing the
1554
 
        # edge of the search to resume from in fallback repositories.
1555
 
        if self.from_repository._fallback_repositories:
1556
 
            return repository.StreamSource.get_stream(self, search)
1557
 
        repo = self.from_repository
 
1597
        if (self.from_repository._fallback_repositories and
 
1598
            self.to_format._fetch_order == 'topological'):
 
1599
            return self._real_stream(self.from_repository, search)
 
1600
        return self.missing_parents_chain(search, [self.from_repository] +
 
1601
            self.from_repository._fallback_repositories)
 
1602
 
 
1603
    def _real_stream(self, repo, search):
 
1604
        """Get a stream for search from repo.
 
1605
        
 
1606
        This never called RemoteStreamSource.get_stream, and is a heler
 
1607
        for RemoteStreamSource._get_stream to allow getting a stream 
 
1608
        reliably whether fallback back because of old servers or trying
 
1609
        to stream from a non-RemoteRepository (which the stacked support
 
1610
        code will do).
 
1611
        """
 
1612
        source = repo._get_source(self.to_format)
 
1613
        if isinstance(source, RemoteStreamSource):
 
1614
            return repository.StreamSource.get_stream(source, search)
 
1615
        return source.get_stream(search)
 
1616
 
 
1617
    def _get_stream(self, repo, search):
 
1618
        """Core worker to get a stream from repo for search.
 
1619
 
 
1620
        This is used by both get_stream and the stacking support logic. It
 
1621
        deliberately gets a stream for repo which does not need to be
 
1622
        self.from_repository. In the event that repo is not Remote, or
 
1623
        cannot do a smart stream, a fallback is made to the generic
 
1624
        repository._get_stream() interface, via self._real_stream.
 
1625
 
 
1626
        In the event of stacking, streams from _get_stream will not
 
1627
        contain all the data for search - this is normal (see get_stream).
 
1628
 
 
1629
        :param repo: A repository.
 
1630
        :param search: A search.
 
1631
        """
 
1632
        # Fallbacks may be non-smart
 
1633
        if not isinstance(repo, RemoteRepository):
 
1634
            return self._real_stream(repo, search)
1558
1635
        client = repo._client
1559
1636
        medium = client._medium
1560
1637
        if medium._is_remote_before((1, 13)):
1561
 
            # No possible way this can work.
1562
 
            return repository.StreamSource.get_stream(self, search)
 
1638
            # streaming was added in 1.13
 
1639
            return self._real_stream(repo, search)
1563
1640
        path = repo.bzrdir._path_for_remote_call(client)
1564
1641
        try:
1565
1642
            search_bytes = repo._serialise_search_result(search)
1569
1646
            response_tuple, response_handler = response
1570
1647
        except errors.UnknownSmartMethod:
1571
1648
            medium._remember_remote_is_before((1,13))
1572
 
            return repository.StreamSource.get_stream(self, search)
 
1649
            return self._real_stream(repo, search)
1573
1650
        if response_tuple[0] != 'ok':
1574
1651
            raise errors.UnexpectedSmartServerResponse(response_tuple)
1575
1652
        byte_stream = response_handler.read_streamed_body()
1580
1657
                src_format.network_name(), repo._format.network_name()))
1581
1658
        return stream
1582
1659
 
 
1660
    def missing_parents_chain(self, search, sources):
 
1661
        """Chain multiple streams together to handle stacking.
 
1662
 
 
1663
        :param search: The overall search to satisfy with streams.
 
1664
        :param sources: A list of Repository objects to query.
 
1665
        """
 
1666
        self.serialiser = self.to_format._serializer
 
1667
        self.seen_revs = set()
 
1668
        self.referenced_revs = set()
 
1669
        # If there are heads in the search, or the key count is > 0, we are not
 
1670
        # done.
 
1671
        while not search.is_empty() and len(sources) > 1:
 
1672
            source = sources.pop(0)
 
1673
            stream = self._get_stream(source, search)
 
1674
            for kind, substream in stream:
 
1675
                if kind != 'revisions':
 
1676
                    yield kind, substream
 
1677
                else:
 
1678
                    yield kind, self.missing_parents_rev_handler(substream)
 
1679
            search = search.refine(self.seen_revs, self.referenced_revs)
 
1680
            self.seen_revs = set()
 
1681
            self.referenced_revs = set()
 
1682
        if not search.is_empty():
 
1683
            for kind, stream in self._get_stream(sources[0], search):
 
1684
                yield kind, stream
 
1685
 
 
1686
    def missing_parents_rev_handler(self, substream):
 
1687
        for content in substream:
 
1688
            revision_bytes = content.get_bytes_as('fulltext')
 
1689
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
1690
            self.seen_revs.add(content.key[-1])
 
1691
            self.referenced_revs.update(revision.parent_ids)
 
1692
            yield content
 
1693
 
1583
1694
 
1584
1695
class RemoteBranchLockableFiles(LockableFiles):
1585
1696
    """A 'LockableFiles' implementation that talks to a smart server.
1700
1811
        self._ensure_real()
1701
1812
        return self._custom_format.supports_tags()
1702
1813
 
 
1814
    def supports_stacking(self):
 
1815
        self._ensure_real()
 
1816
        return self._custom_format.supports_stacking()
 
1817
 
1703
1818
 
1704
1819
class RemoteBranch(branch.Branch, _RpcHelper):
1705
1820
    """Branch stored on a server accessed by HPSS RPC.