/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

merge bzr.dev r4154

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
34
34
    lockdir,
35
35
    lru_cache,
36
36
    osutils,
37
 
    remote,
38
37
    revision as _mod_revision,
39
38
    symbol_versioning,
40
39
    tsort,
637
636
        self.inventories.add_fallback_versioned_files(repository.inventories)
638
637
        self.revisions.add_fallback_versioned_files(repository.revisions)
639
638
        self.signatures.add_fallback_versioned_files(repository.signatures)
640
 
        self._fetch_order = 'topological'
641
639
 
642
640
    def _check_fallback_repository(self, repository):
643
641
        """Check that this repository can fallback to repository safely.
831
829
        self._write_group = None
832
830
        # Additional places to query for data.
833
831
        self._fallback_repositories = []
834
 
        # What order should fetch operations request streams in?
835
 
        # The default is unordered as that is the cheapest for an origin to
836
 
        # provide.
837
 
        self._fetch_order = 'unordered'
838
 
        # Does this repository use deltas that can be fetched as-deltas ?
839
 
        # (E.g. knits, where the knit deltas can be transplanted intact.
840
 
        # We default to False, which will ensure that enough data to get
841
 
        # a full text out of any fetch stream will be grabbed.
842
 
        self._fetch_uses_deltas = False
843
 
        # Should fetch trigger a reconcile after the fetch? Only needed for
844
 
        # some repository formats that can suffer internal inconsistencies.
845
 
        self._fetch_reconcile = False
846
832
        # An InventoryEntry cache, used during deserialization
847
833
        self._inventory_entry_cache = fifo_cache.FIFOCache(10*1024)
848
834
 
897
883
 
898
884
        XXX: this docstring is duplicated in many places, e.g. lockable_files.py
899
885
        """
 
886
        locked = self.is_locked()
900
887
        result = self.control_files.lock_write(token=token)
901
888
        for repo in self._fallback_repositories:
902
889
            # Writes don't affect fallback repos
903
890
            repo.lock_read()
904
 
        self._refresh_data()
 
891
        if not locked:
 
892
            self._refresh_data()
905
893
        return result
906
894
 
907
895
    def lock_read(self):
 
896
        locked = self.is_locked()
908
897
        self.control_files.lock_read()
909
898
        for repo in self._fallback_repositories:
910
899
            repo.lock_read()
911
 
        self._refresh_data()
 
900
        if not locked:
 
901
            self._refresh_data()
912
902
 
913
903
    def get_physical_lock_status(self):
914
904
        return self.control_files.get_physical_lock_status()
1097
1087
    def suspend_write_group(self):
1098
1088
        raise errors.UnsuspendableWriteGroup(self)
1099
1089
 
 
1090
    def refresh_data(self):
 
1091
        """Re-read any data needed to to synchronise with disk.
 
1092
 
 
1093
        This method is intended to be called after another repository instance
 
1094
        (such as one used by a smart server) has inserted data into the
 
1095
        repository. It may not be called during a write group, but may be
 
1096
        called at any other time.
 
1097
        """
 
1098
        if self.is_in_write_group():
 
1099
            raise errors.InternalBzrError(
 
1100
                "May not refresh_data while in a write group.")
 
1101
        self._refresh_data()
 
1102
 
1100
1103
    def resume_write_group(self, tokens):
1101
1104
        if not self.is_write_locked():
1102
1105
            raise errors.NotWriteLocked(self)
1109
1112
    def _resume_write_group(self, tokens):
1110
1113
        raise errors.UnsuspendableWriteGroup(self)
1111
1114
 
1112
 
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
 
1115
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
 
1116
            fetch_spec=None):
1113
1117
        """Fetch the content required to construct revision_id from source.
1114
1118
 
1115
 
        If revision_id is None all content is copied.
 
1119
        If revision_id is None and fetch_spec is None, then all content is
 
1120
        copied.
 
1121
 
 
1122
        fetch() may not be used when the repository is in a write group -
 
1123
        either finish the current write group before using fetch, or use
 
1124
        fetch before starting the write group.
 
1125
 
1116
1126
        :param find_ghosts: Find and copy revisions in the source that are
1117
1127
            ghosts in the target (and not reachable directly by walking out to
1118
1128
            the first-present revision in target from revision_id).
 
1129
        :param revision_id: If specified, all the content needed for this
 
1130
            revision ID will be copied to the target.  Fetch will determine for
 
1131
            itself which content needs to be copied.
 
1132
        :param fetch_spec: If specified, a SearchResult or
 
1133
            PendingAncestryResult that describes which revisions to copy.  This
 
1134
            allows copying multiple heads at once.  Mutually exclusive with
 
1135
            revision_id.
1119
1136
        """
 
1137
        if fetch_spec is not None and revision_id is not None:
 
1138
            raise AssertionError(
 
1139
                "fetch_spec and revision_id are mutually exclusive.")
 
1140
        if self.is_in_write_group():
 
1141
            raise errors.InternalBzrError(
 
1142
                "May not fetch while in a write group.")
1120
1143
        # fast path same-url fetch operations
1121
 
        if self.has_same_location(source):
 
1144
        if self.has_same_location(source) and fetch_spec is None:
1122
1145
            # check that last_revision is in 'from' and then return a
1123
1146
            # no-operation.
1124
1147
            if (revision_id is not None and
1130
1153
        # IncompatibleRepositories when asked to fetch.
1131
1154
        inter = InterRepository.get(source, self)
1132
1155
        return inter.fetch(revision_id=revision_id, pb=pb,
1133
 
            find_ghosts=find_ghosts)
 
1156
            find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1134
1157
 
1135
1158
    def create_bundle(self, target, base, fileobj, format=None):
1136
1159
        return serializer.write_bundle(self, target, base, fileobj, format)
1239
1262
        """Return a sink for streaming into this repository."""
1240
1263
        return StreamSink(self)
1241
1264
 
 
1265
    def _get_source(self, to_format):
 
1266
        """Return a source for streaming from this repository."""
 
1267
        return StreamSource(self, to_format)
 
1268
 
1242
1269
    @needs_read_lock
1243
1270
    def has_revision(self, revision_id):
1244
1271
        """True if this repository has a copy of the revision."""
1352
1379
    def find_text_key_references(self):
1353
1380
        """Find the text key references within the repository.
1354
1381
 
1355
 
        :return: a dictionary mapping (file_id, revision_id) tuples to altered file-ids to an iterable of
1356
 
        revision_ids. Each altered file-ids has the exact revision_ids that
1357
 
        altered it listed explicitly.
1358
1382
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1359
1383
            to whether they were referred to by the inventory of the
1360
1384
            revision_id that they contain. The inventory texts from all present
1449
1473
                result[key] = True
1450
1474
        return result
1451
1475
 
 
1476
    def _inventory_xml_lines_for_keys(self, keys):
 
1477
        """Get a line iterator of the sort needed for findind references.
 
1478
 
 
1479
        Not relevant for non-xml inventory repositories.
 
1480
 
 
1481
        Ghosts in revision_keys are ignored.
 
1482
 
 
1483
        :param revision_keys: The revision keys for the inventories to inspect.
 
1484
        :return: An iterator over (inventory line, revid) for the fulltexts of
 
1485
            all of the xml inventories specified by revision_keys.
 
1486
        """
 
1487
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
 
1488
        for record in stream:
 
1489
            if record.storage_kind != 'absent':
 
1490
                chunks = record.get_bytes_as('chunked')
 
1491
                revid = record.key[-1]
 
1492
                lines = osutils.chunks_to_lines(chunks)
 
1493
                for line in lines:
 
1494
                    yield line, revid
 
1495
 
1452
1496
    def _find_file_ids_from_xml_inventory_lines(self, line_iterator,
1453
1497
        revision_ids):
1454
1498
        """Helper routine for fileids_altered_by_revision_ids.
1464
1508
        revision_ids. Each altered file-ids has the exact revision_ids that
1465
1509
        altered it listed explicitly.
1466
1510
        """
 
1511
        seen = set(self._find_text_key_references_from_xml_inventory_lines(
 
1512
                line_iterator).iterkeys())
 
1513
        # Note that revision_ids are revision keys.
 
1514
        parent_maps = self.revisions.get_parent_map(revision_ids)
 
1515
        parents = set()
 
1516
        map(parents.update, parent_maps.itervalues())
 
1517
        parents.difference_update(revision_ids)
 
1518
        parent_seen = set(self._find_text_key_references_from_xml_inventory_lines(
 
1519
            self._inventory_xml_lines_for_keys(parents)))
 
1520
        new_keys = seen - parent_seen
1467
1521
        result = {}
1468
1522
        setdefault = result.setdefault
1469
 
        for key in \
1470
 
            self._find_text_key_references_from_xml_inventory_lines(
1471
 
                line_iterator).iterkeys():
1472
 
            # once data is all ensured-consistent; then this is
1473
 
            # if revision_id == version_id
1474
 
            if key[-1:] in revision_ids:
1475
 
                setdefault(key[0], set()).add(key[-1])
 
1523
        for key in new_keys:
 
1524
            setdefault(key[0], set()).add(key[-1])
1476
1525
        return result
1477
1526
 
1478
1527
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1576
1625
        batch_size = 10 # should be ~150MB on a 55K path tree
1577
1626
        batch_count = len(revision_order) / batch_size + 1
1578
1627
        processed_texts = 0
1579
 
        pb.update("Calculating text parents.", processed_texts, text_count)
 
1628
        pb.update("Calculating text parents", processed_texts, text_count)
1580
1629
        for offset in xrange(batch_count):
1581
1630
            to_query = revision_order[offset * batch_size:(offset + 1) *
1582
1631
                batch_size]
1586
1635
                revision_id = rev_tree.get_revision_id()
1587
1636
                parent_ids = ancestors[revision_id]
1588
1637
                for text_key in revision_keys[revision_id]:
1589
 
                    pb.update("Calculating text parents.", processed_texts)
 
1638
                    pb.update("Calculating text parents", processed_texts)
1590
1639
                    processed_texts += 1
1591
1640
                    candidate_parents = []
1592
1641
                    for parent_id in parent_ids:
1809
1858
        for repositories to maintain loaded indices across multiple locks
1810
1859
        by checking inside their implementation of this method to see
1811
1860
        whether their indices are still valid. This depends of course on
1812
 
        the disk format being validatable in this manner.
 
1861
        the disk format being validatable in this manner. This method is
 
1862
        also called by the refresh_data() public interface to cause a refresh
 
1863
        to occur while in a write lock so that data inserted by a smart server
 
1864
        push operation is visible on the client's instance of the physical
 
1865
        repository.
1813
1866
        """
1814
1867
 
1815
1868
    @needs_read_lock
1939
1992
                [parents_provider, other_repository._make_parents_provider()])
1940
1993
        return graph.Graph(parents_provider)
1941
1994
 
1942
 
    def _get_versioned_file_checker(self):
1943
 
        """Return an object suitable for checking versioned files."""
1944
 
        return _VersionedFileChecker(self)
 
1995
    def _get_versioned_file_checker(self, text_key_references=None):
 
1996
        """Return an object suitable for checking versioned files.
 
1997
        
 
1998
        :param text_key_references: if non-None, an already built
 
1999
            dictionary mapping text keys ((fileid, revision_id) tuples)
 
2000
            to whether they were referred to by the inventory of the
 
2001
            revision_id that they contain. If None, this will be
 
2002
            calculated.
 
2003
        """
 
2004
        return _VersionedFileChecker(self,
 
2005
            text_key_references=text_key_references)
1945
2006
 
1946
2007
    def revision_ids_to_search_result(self, result_set):
1947
2008
        """Convert a set of revision ids to a graph SearchResult."""
2265
2326
    # Can this repository be given external locations to lookup additional
2266
2327
    # data. Set to True or False in derived classes.
2267
2328
    supports_external_lookups = None
 
2329
    # What order should fetch operations request streams in?
 
2330
    # The default is unordered as that is the cheapest for an origin to
 
2331
    # provide.
 
2332
    _fetch_order = 'unordered'
 
2333
    # Does this repository format use deltas that can be fetched as-deltas ?
 
2334
    # (E.g. knits, where the knit deltas can be transplanted intact.
 
2335
    # We default to False, which will ensure that enough data to get
 
2336
    # a full text out of any fetch stream will be grabbed.
 
2337
    _fetch_uses_deltas = False
 
2338
    # Should fetch trigger a reconcile after the fetch? Only needed for
 
2339
    # some repository formats that can suffer internal inconsistencies.
 
2340
    _fetch_reconcile = False
2268
2341
 
2269
2342
    def __str__(self):
2270
2343
        return "<%s>" % self.__class__.__name__
2549
2622
    InterRepository.get(other).method_name(parameters).
2550
2623
    """
2551
2624
 
2552
 
    _walk_to_common_revisions_batch_size = 1
 
2625
    _walk_to_common_revisions_batch_size = 50
2553
2626
    _optimisers = []
2554
2627
    """The available optimised InterRepository types."""
2555
2628
 
2556
 
    def __init__(self, source, target):
2557
 
        InterObject.__init__(self, source, target)
2558
 
        # These two attributes may be overridden by e.g. InterOtherToRemote to
2559
 
        # provide a faster implementation.
2560
 
        self.target_get_graph = self.target.get_graph
2561
 
        self.target_get_parent_map = self.target.get_parent_map
2562
 
 
 
2629
    @needs_write_lock
2563
2630
    def copy_content(self, revision_id=None):
2564
 
        raise NotImplementedError(self.copy_content)
2565
 
 
2566
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
 
2631
        """Make a complete copy of the content in self into destination.
 
2632
 
 
2633
        This is a destructive operation! Do not use it on existing
 
2634
        repositories.
 
2635
 
 
2636
        :param revision_id: Only copy the content needed to construct
 
2637
                            revision_id and its parents.
 
2638
        """
 
2639
        try:
 
2640
            self.target.set_make_working_trees(self.source.make_working_trees())
 
2641
        except NotImplementedError:
 
2642
            pass
 
2643
        self.target.fetch(self.source, revision_id=revision_id)
 
2644
 
 
2645
    @needs_write_lock
 
2646
    def fetch(self, revision_id=None, pb=None, find_ghosts=False,
 
2647
            fetch_spec=None):
2567
2648
        """Fetch the content required to construct revision_id.
2568
2649
 
2569
2650
        The content is copied from self.source to self.target.
2572
2653
                            content is copied.
2573
2654
        :param pb: optional progress bar to use for progress reports. If not
2574
2655
                   provided a default one will be created.
2575
 
 
2576
 
        :returns: (copied_revision_count, failures).
 
2656
        :return: None.
2577
2657
        """
2578
 
        # Normally we should find a specific InterRepository subclass to do
2579
 
        # the fetch; if nothing else then at least InterSameDataRepository.
2580
 
        # If none of them is suitable it looks like fetching is not possible;
2581
 
        # we try to give a good message why.  _assert_same_model will probably
2582
 
        # give a helpful message; otherwise a generic one.
2583
 
        self._assert_same_model(self.source, self.target)
2584
 
        raise errors.IncompatibleRepositories(self.source, self.target,
2585
 
            "no suitableInterRepository found")
 
2658
        from bzrlib.fetch import RepoFetcher
 
2659
        f = RepoFetcher(to_repository=self.target,
 
2660
                               from_repository=self.source,
 
2661
                               last_revision=revision_id,
 
2662
                               fetch_spec=fetch_spec,
 
2663
                               pb=pb, find_ghosts=find_ghosts)
2586
2664
 
2587
2665
    def _walk_to_common_revisions(self, revision_ids):
2588
2666
        """Walk out from revision_ids in source to revisions target has.
2590
2668
        :param revision_ids: The start point for the search.
2591
2669
        :return: A set of revision ids.
2592
2670
        """
2593
 
        target_graph = self.target_get_graph()
 
2671
        target_graph = self.target.get_graph()
2594
2672
        revision_ids = frozenset(revision_ids)
2595
2673
        # Fast path for the case where all the revisions are already in the
2596
2674
        # target repo.
2728
2806
    def is_compatible(source, target):
2729
2807
        return InterRepository._same_model(source, target)
2730
2808
 
2731
 
    @needs_write_lock
2732
 
    def copy_content(self, revision_id=None):
2733
 
        """Make a complete copy of the content in self into destination.
2734
 
 
2735
 
        This copies both the repository's revision data, and configuration information
2736
 
        such as the make_working_trees setting.
2737
 
 
2738
 
        This is a destructive operation! Do not use it on existing
2739
 
        repositories.
2740
 
 
2741
 
        :param revision_id: Only copy the content needed to construct
2742
 
                            revision_id and its parents.
2743
 
        """
2744
 
        try:
2745
 
            self.target.set_make_working_trees(self.source.make_working_trees())
2746
 
        except NotImplementedError:
2747
 
            pass
2748
 
        # but don't bother fetching if we have the needed data now.
2749
 
        if (revision_id not in (None, _mod_revision.NULL_REVISION) and
2750
 
            self.target.has_revision(revision_id)):
2751
 
            return
2752
 
        self.target.fetch(self.source, revision_id=revision_id)
2753
 
 
2754
 
    @needs_write_lock
2755
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2756
 
        """See InterRepository.fetch()."""
2757
 
        from bzrlib.fetch import RepoFetcher
2758
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2759
 
               self.source, self.source._format, self.target,
2760
 
               self.target._format)
2761
 
        f = RepoFetcher(to_repository=self.target,
2762
 
                               from_repository=self.source,
2763
 
                               last_revision=revision_id,
2764
 
                               pb=pb, find_ghosts=find_ghosts)
2765
 
        return f.count_copied, f.failed_revisions
2766
 
 
2767
2809
 
2768
2810
class InterWeaveRepo(InterSameDataRepository):
2769
2811
    """Optimised code paths between Weave based repositories.
2832
2874
        else:
2833
2875
            self.target.fetch(self.source, revision_id=revision_id)
2834
2876
 
2835
 
    @needs_write_lock
2836
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2837
 
        """See InterRepository.fetch()."""
2838
 
        from bzrlib.fetch import RepoFetcher
2839
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2840
 
               self.source, self.source._format, self.target, self.target._format)
2841
 
        f = RepoFetcher(to_repository=self.target,
2842
 
                               from_repository=self.source,
2843
 
                               last_revision=revision_id,
2844
 
                               pb=pb, find_ghosts=find_ghosts)
2845
 
        return f.count_copied, f.failed_revisions
2846
 
 
2847
2877
    @needs_read_lock
2848
2878
    def search_missing_revision_ids(self, revision_id=None, find_ghosts=True):
2849
2879
        """See InterRepository.missing_revision_ids()."""
2913
2943
            return False
2914
2944
        return are_knits and InterRepository._same_model(source, target)
2915
2945
 
2916
 
    @needs_write_lock
2917
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2918
 
        """See InterRepository.fetch()."""
2919
 
        from bzrlib.fetch import RepoFetcher
2920
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2921
 
               self.source, self.source._format, self.target, self.target._format)
2922
 
        f = RepoFetcher(to_repository=self.target,
2923
 
                            from_repository=self.source,
2924
 
                            last_revision=revision_id,
2925
 
                            pb=pb, find_ghosts=find_ghosts)
2926
 
        return f.count_copied, f.failed_revisions
2927
 
 
2928
2946
    @needs_read_lock
2929
2947
    def search_missing_revision_ids(self, revision_id=None, find_ghosts=True):
2930
2948
        """See InterRepository.missing_revision_ids()."""
2984
3002
        return are_packs and InterRepository._same_model(source, target)
2985
3003
 
2986
3004
    @needs_write_lock
2987
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
 
3005
    def fetch(self, revision_id=None, pb=None, find_ghosts=False,
 
3006
            fetch_spec=None):
2988
3007
        """See InterRepository.fetch()."""
2989
3008
        if (len(self.source._fallback_repositories) > 0 or
2990
3009
            len(self.target._fallback_repositories) > 0):
2994
3013
            # attributes on repository.
2995
3014
            from bzrlib.fetch import RepoFetcher
2996
3015
            fetcher = RepoFetcher(self.target, self.source, revision_id,
2997
 
                                  pb, find_ghosts)
2998
 
            return fetcher.count_copied, fetcher.failed_revisions
2999
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3000
 
               self.source, self.source._format, self.target, self.target._format)
3001
 
        self.count_copied = 0
 
3016
                    pb, find_ghosts, fetch_spec=fetch_spec)
 
3017
        if fetch_spec is not None:
 
3018
            if len(list(fetch_spec.heads)) != 1:
 
3019
                raise AssertionError(
 
3020
                    "InterPackRepo.fetch doesn't support "
 
3021
                    "fetching multiple heads yet.")
 
3022
            revision_id = fetch_spec.heads[0]
 
3023
            fetch_spec = None
3002
3024
        if revision_id is None:
3003
3025
            # TODO:
3004
3026
            # everything to do - use pack logic
3007
3029
            # till then:
3008
3030
            source_revision_ids = frozenset(self.source.all_revision_ids())
3009
3031
            revision_ids = source_revision_ids - \
3010
 
                frozenset(self.target_get_parent_map(source_revision_ids))
 
3032
                frozenset(self.target.get_parent_map(source_revision_ids))
3011
3033
            revision_keys = [(revid,) for revid in revision_ids]
3012
 
            target_pack_collection = self._get_target_pack_collection()
3013
 
            index = target_pack_collection.revision_index.combined_index
 
3034
            index = self.target._pack_collection.revision_index.combined_index
3014
3035
            present_revision_ids = set(item[1][0] for item in
3015
3036
                index.iter_entries(revision_keys))
3016
3037
            revision_ids = set(revision_ids) - present_revision_ids
3036
3057
 
3037
3058
    def _pack(self, source, target, revision_ids):
3038
3059
        from bzrlib.repofmt.pack_repo import Packer
3039
 
        target_pack_collection = self._get_target_pack_collection()
3040
3060
        packs = source._pack_collection.all_packs()
3041
 
        pack = Packer(target_pack_collection, packs, '.fetch',
 
3061
        pack = Packer(self.target._pack_collection, packs, '.fetch',
3042
3062
            revision_ids).pack()
3043
3063
        if pack is not None:
3044
 
            target_pack_collection._save_pack_names()
 
3064
            self.target._pack_collection._save_pack_names()
3045
3065
            copied_revs = pack.get_revision_count()
3046
3066
            # Trigger an autopack. This may duplicate effort as we've just done
3047
3067
            # a pack creation, but for now it is simpler to think about as
3048
3068
            # 'upload data, then repack if needed'.
3049
 
            self._autopack()
 
3069
            self.target._pack_collection.autopack()
3050
3070
            return (copied_revs, [])
3051
3071
        else:
3052
3072
            return (0, [])
3053
3073
 
3054
 
    def _autopack(self):
3055
 
        self.target._pack_collection.autopack()
3056
 
 
3057
 
    def _get_target_pack_collection(self):
3058
 
        return self.target._pack_collection
3059
 
 
3060
3074
    @needs_read_lock
3061
3075
    def search_missing_revision_ids(self, revision_id=None, find_ghosts=True):
3062
3076
        """See InterRepository.missing_revision_ids().
3069
3083
        elif revision_id is not None:
3070
3084
            # Find ghosts: search for revisions pointing from one repository to
3071
3085
            # the other, and vice versa, anywhere in the history of revision_id.
3072
 
            graph = self.target_get_graph(other_repository=self.source)
 
3086
            graph = self.target.get_graph(other_repository=self.source)
3073
3087
            searcher = graph._make_breadth_first_searcher([revision_id])
3074
3088
            found_ids = set()
3075
3089
            while True:
3085
3099
            # Double query here: should be able to avoid this by changing the
3086
3100
            # graph api further.
3087
3101
            result_set = found_ids - frozenset(
3088
 
                self.target_get_parent_map(found_ids))
 
3102
                self.target.get_parent_map(found_ids))
3089
3103
        else:
3090
3104
            source_ids = self.source.all_revision_ids()
3091
3105
            # source_ids is the worst possible case we may need to pull.
3097
3111
        return self.source.revision_ids_to_search_result(result_set)
3098
3112
 
3099
3113
 
3100
 
class InterModel1and2(InterRepository):
3101
 
 
3102
 
    @classmethod
3103
 
    def _get_repo_format_to_test(self):
3104
 
        return None
3105
 
 
3106
 
    @staticmethod
3107
 
    def is_compatible(source, target):
3108
 
        if not source.supports_rich_root() and target.supports_rich_root():
3109
 
            return True
3110
 
        else:
3111
 
            return False
3112
 
 
3113
 
    @needs_write_lock
3114
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3115
 
        """See InterRepository.fetch()."""
3116
 
        from bzrlib.fetch import Model1toKnit2Fetcher
3117
 
        f = Model1toKnit2Fetcher(to_repository=self.target,
3118
 
                                 from_repository=self.source,
3119
 
                                 last_revision=revision_id,
3120
 
                                 pb=pb, find_ghosts=find_ghosts)
3121
 
        return f.count_copied, f.failed_revisions
3122
 
 
3123
 
    @needs_write_lock
3124
 
    def copy_content(self, revision_id=None):
3125
 
        """Make a complete copy of the content in self into destination.
3126
 
 
3127
 
        This is a destructive operation! Do not use it on existing
3128
 
        repositories.
3129
 
 
3130
 
        :param revision_id: Only copy the content needed to construct
3131
 
                            revision_id and its parents.
3132
 
        """
3133
 
        try:
3134
 
            self.target.set_make_working_trees(self.source.make_working_trees())
3135
 
        except NotImplementedError:
3136
 
            pass
3137
 
        # but don't bother fetching if we have the needed data now.
3138
 
        if (revision_id not in (None, _mod_revision.NULL_REVISION) and
3139
 
            self.target.has_revision(revision_id)):
3140
 
            return
3141
 
        self.target.fetch(self.source, revision_id=revision_id)
3142
 
 
3143
 
 
3144
 
class InterKnit1and2(InterKnitRepo):
3145
 
 
3146
 
    @classmethod
3147
 
    def _get_repo_format_to_test(self):
3148
 
        return None
3149
 
 
3150
 
    @staticmethod
3151
 
    def is_compatible(source, target):
3152
 
        """Be compatible with Knit1 source and Knit3 target"""
3153
 
        try:
3154
 
            from bzrlib.repofmt.knitrepo import (
3155
 
                RepositoryFormatKnit1,
3156
 
                RepositoryFormatKnit3,
3157
 
                )
3158
 
            from bzrlib.repofmt.pack_repo import (
3159
 
                RepositoryFormatKnitPack1,
3160
 
                RepositoryFormatKnitPack3,
3161
 
                RepositoryFormatKnitPack4,
3162
 
                RepositoryFormatKnitPack5,
3163
 
                RepositoryFormatKnitPack5RichRoot,
3164
 
                RepositoryFormatKnitPack6,
3165
 
                RepositoryFormatKnitPack6RichRoot,
3166
 
                RepositoryFormatPackDevelopment2,
3167
 
                RepositoryFormatPackDevelopment2Subtree,
3168
 
                )
3169
 
            norichroot = (
3170
 
                RepositoryFormatKnit1,            # no rr, no subtree
3171
 
                RepositoryFormatKnitPack1,        # no rr, no subtree
3172
 
                RepositoryFormatPackDevelopment2, # no rr, no subtree
3173
 
                RepositoryFormatKnitPack5,        # no rr, no subtree
3174
 
                RepositoryFormatKnitPack6,        # no rr, no subtree
3175
 
                )
3176
 
            richroot = (
3177
 
                RepositoryFormatKnit3,            # rr, subtree
3178
 
                RepositoryFormatKnitPack3,        # rr, subtree
3179
 
                RepositoryFormatKnitPack4,        # rr, no subtree
3180
 
                RepositoryFormatKnitPack5RichRoot,# rr, no subtree
3181
 
                RepositoryFormatKnitPack6RichRoot,# rr, no subtree
3182
 
                RepositoryFormatPackDevelopment2Subtree, # rr, subtree
3183
 
                )
3184
 
            for format in norichroot:
3185
 
                if format.rich_root_data:
3186
 
                    raise AssertionError('Format %s is a rich-root format'
3187
 
                        ' but is included in the non-rich-root list'
3188
 
                        % (format,))
3189
 
            for format in richroot:
3190
 
                if not format.rich_root_data:
3191
 
                    raise AssertionError('Format %s is not a rich-root format'
3192
 
                        ' but is included in the rich-root list'
3193
 
                        % (format,))
3194
 
            # TODO: One alternative is to just check format.rich_root_data,
3195
 
            #       instead of keeping membership lists. However, the formats
3196
 
            #       *also* have to use the same 'Knit' style of storage
3197
 
            #       (line-deltas, fulltexts, etc.)
3198
 
            return (isinstance(source._format, norichroot) and
3199
 
                    isinstance(target._format, richroot))
3200
 
        except AttributeError:
3201
 
            return False
3202
 
 
3203
 
    @needs_write_lock
3204
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3205
 
        """See InterRepository.fetch()."""
3206
 
        from bzrlib.fetch import Knit1to2Fetcher
3207
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3208
 
               self.source, self.source._format, self.target,
3209
 
               self.target._format)
3210
 
        f = Knit1to2Fetcher(to_repository=self.target,
3211
 
                            from_repository=self.source,
3212
 
                            last_revision=revision_id,
3213
 
                            pb=pb, find_ghosts=find_ghosts)
3214
 
        return f.count_copied, f.failed_revisions
3215
 
 
3216
 
 
3217
3114
class InterDifferingSerializer(InterKnitRepo):
3218
3115
 
3219
3116
    @classmethod
3280
3177
                        # We don't copy the text for the root node unless the
3281
3178
                        # target supports_rich_root.
3282
3179
                        continue
3283
 
                    # TODO: Do we need:
3284
 
                    #       "if entry.revision == current_revision_id" ?
3285
 
                    if entry.revision == current_revision_id:
3286
 
                        text_keys.add((file_id, entry.revision))
 
3180
                    text_keys.add((file_id, entry.revision))
3287
3181
            revision = self.source.get_revision(current_revision_id)
3288
3182
            pending_deltas.append((basis_id, delta,
3289
3183
                current_revision_id, revision.parent_ids))
3294
3188
        from_texts = self.source.texts
3295
3189
        to_texts = self.target.texts
3296
3190
        to_texts.insert_record_stream(from_texts.get_record_stream(
3297
 
            text_keys, self.target._fetch_order,
3298
 
            not self.target._fetch_uses_deltas))
 
3191
            text_keys, self.target._format._fetch_order,
 
3192
            not self.target._format._fetch_uses_deltas))
3299
3193
        # insert deltas
3300
3194
        for delta in pending_deltas:
3301
3195
            self.target.add_inventory_by_delta(*delta)
3340
3234
                  len(revision_ids))
3341
3235
 
3342
3236
    @needs_write_lock
3343
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
 
3237
    def fetch(self, revision_id=None, pb=None, find_ghosts=False,
 
3238
            fetch_spec=None):
3344
3239
        """See InterRepository.fetch()."""
 
3240
        if fetch_spec is not None:
 
3241
            raise AssertionError("Not implemented yet...")
3345
3242
        revision_ids = self.target.search_missing_revision_ids(self.source,
3346
3243
            revision_id, find_ghosts=find_ghosts).get_keys()
3347
3244
        if not revision_ids:
3352
3249
            my_pb = ui.ui_factory.nested_progress_bar()
3353
3250
            pb = my_pb
3354
3251
        else:
 
3252
            symbol_versioning.warn(
 
3253
                symbol_versioning.deprecated_in((1, 14, 0))
 
3254
                % "pb parameter to fetch()")
3355
3255
            my_pb = None
3356
3256
        try:
3357
3257
            self._fetch_all_revisions(revision_ids, pb)
3383
3283
        return basis_id, basis_tree
3384
3284
 
3385
3285
 
3386
 
class InterOtherToRemote(InterRepository):
3387
 
    """An InterRepository that simply delegates to the 'real' InterRepository
3388
 
    calculated for (source, target._real_repository).
3389
 
    """
3390
 
 
3391
 
    _walk_to_common_revisions_batch_size = 50
3392
 
 
3393
 
    def __init__(self, source, target):
3394
 
        InterRepository.__init__(self, source, target)
3395
 
        self._real_inter = None
3396
 
 
3397
 
    @staticmethod
3398
 
    def is_compatible(source, target):
3399
 
        if isinstance(target, remote.RemoteRepository):
3400
 
            return True
3401
 
        return False
3402
 
 
3403
 
    def _ensure_real_inter(self):
3404
 
        if self._real_inter is None:
3405
 
            self.target._ensure_real()
3406
 
            real_target = self.target._real_repository
3407
 
            self._real_inter = InterRepository.get(self.source, real_target)
3408
 
            # Make _real_inter use the RemoteRepository for get_parent_map
3409
 
            self._real_inter.target_get_graph = self.target.get_graph
3410
 
            self._real_inter.target_get_parent_map = self.target.get_parent_map
3411
 
 
3412
 
    def copy_content(self, revision_id=None):
3413
 
        self._ensure_real_inter()
3414
 
        self._real_inter.copy_content(revision_id=revision_id)
3415
 
 
3416
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3417
 
        self._ensure_real_inter()
3418
 
        return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3419
 
            find_ghosts=find_ghosts)
3420
 
 
3421
 
    @classmethod
3422
 
    def _get_repo_format_to_test(self):
3423
 
        return None
3424
 
 
3425
 
 
3426
 
class InterRemoteToOther(InterRepository):
3427
 
 
3428
 
    def __init__(self, source, target):
3429
 
        InterRepository.__init__(self, source, target)
3430
 
        self._real_inter = None
3431
 
 
3432
 
    @staticmethod
3433
 
    def is_compatible(source, target):
3434
 
        if not isinstance(source, remote.RemoteRepository):
3435
 
            return False
3436
 
        # Is source's model compatible with target's model?
3437
 
        source._ensure_real()
3438
 
        real_source = source._real_repository
3439
 
        if isinstance(real_source, remote.RemoteRepository):
3440
 
            raise NotImplementedError(
3441
 
                "We don't support remote repos backed by remote repos yet.")
3442
 
        return InterRepository._same_model(real_source, target)
3443
 
 
3444
 
    def _ensure_real_inter(self):
3445
 
        if self._real_inter is None:
3446
 
            self.source._ensure_real()
3447
 
            real_source = self.source._real_repository
3448
 
            self._real_inter = InterRepository.get(real_source, self.target)
3449
 
 
3450
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3451
 
        self._ensure_real_inter()
3452
 
        return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3453
 
            find_ghosts=find_ghosts)
3454
 
 
3455
 
    def copy_content(self, revision_id=None):
3456
 
        self._ensure_real_inter()
3457
 
        self._real_inter.copy_content(revision_id=revision_id)
3458
 
 
3459
 
    @classmethod
3460
 
    def _get_repo_format_to_test(self):
3461
 
        return None
3462
 
 
3463
 
 
3464
 
 
3465
 
class InterPackToRemotePack(InterPackRepo):
3466
 
    """A specialisation of InterPackRepo for a target that is a
3467
 
    RemoteRepository.
3468
 
 
3469
 
    This will use the get_parent_map RPC rather than plain readvs, and also
3470
 
    uses an RPC for autopacking.
3471
 
    """
3472
 
 
3473
 
    _walk_to_common_revisions_batch_size = 50
3474
 
 
3475
 
    @staticmethod
3476
 
    def is_compatible(source, target):
3477
 
        from bzrlib.repofmt.pack_repo import RepositoryFormatPack
3478
 
        if isinstance(source._format, RepositoryFormatPack):
3479
 
            if isinstance(target, remote.RemoteRepository):
3480
 
                target._ensure_real()
3481
 
                if isinstance(target._real_repository._format,
3482
 
                              RepositoryFormatPack):
3483
 
                    if InterRepository._same_model(source, target):
3484
 
                        return True
3485
 
        return False
3486
 
 
3487
 
    def _autopack(self):
3488
 
        self.target.autopack()
3489
 
 
3490
 
    @needs_write_lock
3491
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3492
 
        """See InterRepository.fetch()."""
3493
 
        # Always fetch using the generic streaming fetch code, to allow
3494
 
        # streaming fetching into remote servers.
3495
 
        from bzrlib.fetch import RepoFetcher
3496
 
        fetcher = RepoFetcher(self.target, self.source, revision_id,
3497
 
                              pb, find_ghosts)
3498
 
        return fetcher.count_copied, fetcher.failed_revisions
3499
 
 
3500
 
    def _get_target_pack_collection(self):
3501
 
        return self.target._real_repository._pack_collection
3502
 
 
3503
 
    @classmethod
3504
 
    def _get_repo_format_to_test(self):
3505
 
        return None
3506
 
 
3507
 
 
3508
3286
InterRepository.register_optimiser(InterDifferingSerializer)
3509
3287
InterRepository.register_optimiser(InterSameDataRepository)
3510
3288
InterRepository.register_optimiser(InterWeaveRepo)
3511
3289
InterRepository.register_optimiser(InterKnitRepo)
3512
 
InterRepository.register_optimiser(InterModel1and2)
3513
 
InterRepository.register_optimiser(InterKnit1and2)
3514
3290
InterRepository.register_optimiser(InterPackRepo)
3515
 
InterRepository.register_optimiser(InterOtherToRemote)
3516
 
InterRepository.register_optimiser(InterRemoteToOther)
3517
 
InterRepository.register_optimiser(InterPackToRemotePack)
3518
3291
 
3519
3292
 
3520
3293
class CopyConverter(object):
3601
3374
 
3602
3375
class _VersionedFileChecker(object):
3603
3376
 
3604
 
    def __init__(self, repository):
 
3377
    def __init__(self, repository, text_key_references=None):
3605
3378
        self.repository = repository
3606
 
        self.text_index = self.repository._generate_text_key_index()
 
3379
        self.text_index = self.repository._generate_text_key_index(
 
3380
            text_key_references=text_key_references)
3607
3381
 
3608
3382
    def calculate_file_version_parents(self, text_key):
3609
3383
        """Calculate the correct parents for a file version according to
3783
3557
            self.target_repo.add_revision(revision_id, rev)
3784
3558
 
3785
3559
    def finished(self):
3786
 
        if self.target_repo._fetch_reconcile:
 
3560
        if self.target_repo._format._fetch_reconcile:
3787
3561
            self.target_repo.reconcile()
3788
3562
 
 
3563
 
 
3564
class StreamSource(object):
 
3565
    """A source of a stream for fetching between repositories."""
 
3566
 
 
3567
    def __init__(self, from_repository, to_format):
 
3568
        """Create a StreamSource streaming from from_repository."""
 
3569
        self.from_repository = from_repository
 
3570
        self.to_format = to_format
 
3571
 
 
3572
    def delta_on_metadata(self):
 
3573
        """Return True if delta's are permitted on metadata streams.
 
3574
 
 
3575
        That is on revisions and signatures.
 
3576
        """
 
3577
        src_serializer = self.from_repository._format._serializer
 
3578
        target_serializer = self.to_format._serializer
 
3579
        return (self.to_format._fetch_uses_deltas and
 
3580
            src_serializer == target_serializer)
 
3581
 
 
3582
    def _fetch_revision_texts(self, revs):
 
3583
        # fetch signatures first and then the revision texts
 
3584
        # may need to be a InterRevisionStore call here.
 
3585
        from_sf = self.from_repository.signatures
 
3586
        # A missing signature is just skipped.
 
3587
        keys = [(rev_id,) for rev_id in revs]
 
3588
        signatures = versionedfile.filter_absent(from_sf.get_record_stream(
 
3589
            keys,
 
3590
            self.to_format._fetch_order,
 
3591
            not self.to_format._fetch_uses_deltas))
 
3592
        # If a revision has a delta, this is actually expanded inside the
 
3593
        # insert_record_stream code now, which is an alternate fix for
 
3594
        # bug #261339
 
3595
        from_rf = self.from_repository.revisions
 
3596
        revisions = from_rf.get_record_stream(
 
3597
            keys,
 
3598
            self.to_format._fetch_order,
 
3599
            not self.delta_on_metadata())
 
3600
        return [('signatures', signatures), ('revisions', revisions)]
 
3601
 
 
3602
    def _generate_root_texts(self, revs):
 
3603
        """This will be called by __fetch between fetching weave texts and
 
3604
        fetching the inventory weave.
 
3605
 
 
3606
        Subclasses should override this if they need to generate root texts
 
3607
        after fetching weave texts.
 
3608
        """
 
3609
        if self._rich_root_upgrade():
 
3610
            import bzrlib.fetch
 
3611
            return bzrlib.fetch.Inter1and2Helper(
 
3612
                self.from_repository).generate_root_texts(revs)
 
3613
        else:
 
3614
            return []
 
3615
 
 
3616
    def get_stream(self, search):
 
3617
        phase = 'file'
 
3618
        revs = search.get_keys()
 
3619
        graph = self.from_repository.get_graph()
 
3620
        revs = list(graph.iter_topo_order(revs))
 
3621
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
 
3622
        text_keys = []
 
3623
        for knit_kind, file_id, revisions in data_to_fetch:
 
3624
            if knit_kind != phase:
 
3625
                phase = knit_kind
 
3626
                # Make a new progress bar for this phase
 
3627
            if knit_kind == "file":
 
3628
                # Accumulate file texts
 
3629
                text_keys.extend([(file_id, revision) for revision in
 
3630
                    revisions])
 
3631
            elif knit_kind == "inventory":
 
3632
                # Now copy the file texts.
 
3633
                from_texts = self.from_repository.texts
 
3634
                yield ('texts', from_texts.get_record_stream(
 
3635
                    text_keys, self.to_format._fetch_order,
 
3636
                    not self.to_format._fetch_uses_deltas))
 
3637
                # Cause an error if a text occurs after we have done the
 
3638
                # copy.
 
3639
                text_keys = None
 
3640
                # Before we process the inventory we generate the root
 
3641
                # texts (if necessary) so that the inventories references
 
3642
                # will be valid.
 
3643
                for _ in self._generate_root_texts(revs):
 
3644
                    yield _
 
3645
                # NB: This currently reopens the inventory weave in source;
 
3646
                # using a single stream interface instead would avoid this.
 
3647
                from_weave = self.from_repository.inventories
 
3648
                # we fetch only the referenced inventories because we do not
 
3649
                # know for unselected inventories whether all their required
 
3650
                # texts are present in the other repository - it could be
 
3651
                # corrupt.
 
3652
                yield ('inventories', from_weave.get_record_stream(
 
3653
                    [(rev_id,) for rev_id in revs],
 
3654
                    self.inventory_fetch_order(),
 
3655
                    not self.delta_on_metadata()))
 
3656
            elif knit_kind == "signatures":
 
3657
                # Nothing to do here; this will be taken care of when
 
3658
                # _fetch_revision_texts happens.
 
3659
                pass
 
3660
            elif knit_kind == "revisions":
 
3661
                for record in self._fetch_revision_texts(revs):
 
3662
                    yield record
 
3663
            else:
 
3664
                raise AssertionError("Unknown knit kind %r" % knit_kind)
 
3665
 
 
3666
    def get_stream_for_missing_keys(self, missing_keys):
 
3667
        # missing keys can only occur when we are byte copying and not
 
3668
        # translating (because translation means we don't send
 
3669
        # unreconstructable deltas ever).
 
3670
        keys = {}
 
3671
        keys['texts'] = set()
 
3672
        keys['revisions'] = set()
 
3673
        keys['inventories'] = set()
 
3674
        keys['signatures'] = set()
 
3675
        for key in missing_keys:
 
3676
            keys[key[0]].add(key[1:])
 
3677
        if len(keys['revisions']):
 
3678
            # If we allowed copying revisions at this point, we could end up
 
3679
            # copying a revision without copying its required texts: a
 
3680
            # violation of the requirements for repository integrity.
 
3681
            raise AssertionError(
 
3682
                'cannot copy revisions to fill in missing deltas %s' % (
 
3683
                    keys['revisions'],))
 
3684
        for substream_kind, keys in keys.iteritems():
 
3685
            vf = getattr(self.from_repository, substream_kind)
 
3686
            # Ask for full texts always so that we don't need more round trips
 
3687
            # after this stream.
 
3688
            stream = vf.get_record_stream(keys,
 
3689
                self.to_format._fetch_order, True)
 
3690
            yield substream_kind, stream
 
3691
 
 
3692
    def inventory_fetch_order(self):
 
3693
        if self._rich_root_upgrade():
 
3694
            return 'topological'
 
3695
        else:
 
3696
            return self.to_format._fetch_order
 
3697
 
 
3698
    def _rich_root_upgrade(self):
 
3699
        return (not self.from_repository._format.rich_root_data and
 
3700
            self.to_format.rich_root_data)
 
3701