/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Repository formats using CHK inventories and groupcompress compression."""
18
18
 
19
 
from __future__ import absolute_import
20
 
 
21
19
import time
22
20
 
23
21
from .. import (
50
48
    Pack,
51
49
    NewPack,
52
50
    PackRepository,
53
 
    PackRootCommitBuilder,
 
51
    PackCommitBuilder,
54
52
    RepositoryPackCollection,
55
53
    RepositoryFormatPack,
56
54
    ResumedPack,
59
57
from ..bzr.vf_repository import (
60
58
    StreamSource,
61
59
    )
62
 
from ..sixish import (
63
 
    viewitems,
64
 
    viewvalues,
65
 
    )
66
60
from ..static_tuple import StaticTuple
67
61
 
68
62
 
91
85
        else:
92
86
            chk_index = None
93
87
        Pack.__init__(self,
94
 
            # Revisions: parents list, no text compression.
95
 
            index_builder_class(reference_lists=1),
96
 
            # Inventory: We want to map compression only, but currently the
97
 
            # knit code hasn't been updated enough to understand that, so we
98
 
            # have a regular 2-list index giving parents and compression
99
 
            # source.
100
 
            index_builder_class(reference_lists=1),
101
 
            # Texts: per file graph, for all fileids - so one reference list
102
 
            # and two elements in the key tuple.
103
 
            index_builder_class(reference_lists=1, key_elements=2),
104
 
            # Signatures: Just blobs to store, no compression, no parents
105
 
            # listing.
106
 
            index_builder_class(reference_lists=0),
107
 
            # CHK based storage - just blobs, no compression or parents.
108
 
            chk_index=chk_index
109
 
            )
 
88
                      # Revisions: parents list, no text compression.
 
89
                      index_builder_class(reference_lists=1),
 
90
                      # Inventory: We want to map compression only, but currently the
 
91
                      # knit code hasn't been updated enough to understand that, so we
 
92
                      # have a regular 2-list index giving parents and compression
 
93
                      # source.
 
94
                      index_builder_class(reference_lists=1),
 
95
                      # Texts: per file graph, for all fileids - so one reference list
 
96
                      # and two elements in the key tuple.
 
97
                      index_builder_class(reference_lists=1, key_elements=2),
 
98
                      # Signatures: Just blobs to store, no compression, no parents
 
99
                      # listing.
 
100
                      index_builder_class(reference_lists=0),
 
101
                      # CHK based storage - just blobs, no compression or parents.
 
102
                      chk_index=chk_index
 
103
                      )
110
104
        self._pack_collection = pack_collection
111
105
        # When we make readonly indices, we need this.
112
106
        self.index_class = pack_collection._index_class
137
131
            self.random_name, mode=self._file_mode)
138
132
        if 'pack' in debug.debug_flags:
139
133
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
140
 
                time.ctime(), self.upload_transport.base, self.random_name,
141
 
                time.time() - self.start_time)
 
134
                         time.ctime(), self.upload_transport.base, self.random_name,
 
135
                         time.time() - self.start_time)
142
136
        # A list of byte sequences to be written to the new pack, and the
143
137
        # aggregate size of them.  Stored as a list rather than separate
144
138
        # variables so that the _write_data closure below can update them.
148
142
        # robertc says- this is a closure rather than a method on the object
149
143
        # so that the variables are locals, and faster than accessing object
150
144
        # members.
 
145
 
151
146
        def _write_data(data, flush=False, _buffer=self._buffer,
152
 
            _write=self.write_stream.write, _update=self._hash.update):
 
147
                        _write=self.write_stream.write, _update=self._hash.update):
153
148
            _buffer[0].append(data)
154
149
            _buffer[1] += len(data)
155
150
            # buffer cap
204
199
        self._pack_collection = pack_collection
205
200
        # ATM, We only support this for GCCHK repositories
206
201
        if pack_collection.chk_index is None:
207
 
            raise AssertionError('pack_collection.chk_index should not be None')
 
202
            raise AssertionError(
 
203
                'pack_collection.chk_index should not be None')
208
204
        self._gather_text_refs = False
209
205
        self._chk_id_roots = []
210
206
        self._chk_p_id_roots = []
214
210
 
215
211
    def _get_progress_stream(self, source_vf, keys, message, pb):
216
212
        def pb_stream():
217
 
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
213
            substream = source_vf.get_record_stream(
 
214
                keys, 'groupcompress', True)
218
215
            for idx, record in enumerate(substream):
219
216
                if pb is not None:
220
217
                    pb.update(message, idx + 1, len(keys))
224
221
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
225
222
        """Filter the texts of inventories, to find the chk pages."""
226
223
        total_keys = len(keys)
 
224
 
227
225
        def _filtered_inv_stream():
228
226
            id_roots_set = set()
229
227
            p_id_roots_set = set()
230
228
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
231
229
            for idx, record in enumerate(stream):
232
230
                # Inventories should always be with revisions; assume success.
233
 
                bytes = record.get_bytes_as('fulltext')
234
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
235
 
                                                             record.key)
 
231
                lines = record.get_bytes_as('lines')
 
232
                chk_inv = inventory.CHKInventory.deserialise(
 
233
                    None, lines, record.key)
236
234
                if pb is not None:
237
235
                    pb.update('inv', idx, total_keys)
238
236
                key = chk_inv.id_to_entry.key()
274
272
        counter = [0]
275
273
        if self._gather_text_refs:
276
274
            self._text_refs = set()
 
275
 
277
276
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
278
277
            cur_keys = root_keys
279
278
            while cur_keys:
280
279
                keys_by_search_prefix = {}
281
280
                remaining_keys.difference_update(cur_keys)
282
281
                next_keys = set()
 
282
 
283
283
                def handle_internal_node(node):
284
 
                    for prefix, value in viewitems(node._items):
 
284
                    for prefix, value in node._items.items():
285
285
                        # We don't want to request the same key twice, and we
286
286
                        # want to order it by the first time it is seen.
287
287
                        # Even further, we don't want to request a key which is
293
293
                        #       always fill them in for stacked branches
294
294
                        if value not in next_keys and value in remaining_keys:
295
295
                            keys_by_search_prefix.setdefault(prefix,
296
 
                                []).append(value)
 
296
                                                             []).append(value)
297
297
                            next_keys.add(value)
 
298
 
298
299
                def handle_leaf_node(node):
299
300
                    # Store is None, because we know we have a LeafNode, and we
300
301
                    # just want its entries
301
302
                    for file_id, bytes in node.iteritems(None):
302
303
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
 
304
 
303
305
                def next_stream():
304
306
                    stream = source_vf.get_record_stream(cur_keys,
305
307
                                                         'as-requested', True)
402
404
                     pb_offset):
403
405
        trace.mutter('repacking %d %s', len(keys), message)
404
406
        self.pb.update('repacking %s' % (message,), pb_offset)
405
 
        child_pb = ui.ui_factory.nested_progress_bar()
406
 
        try:
 
407
        with ui.ui_factory.nested_progress_bar() as child_pb:
407
408
            stream = vf_to_stream(source_vf, keys, message, child_pb)
408
 
            for _ in target_vf._insert_record_stream(stream,
409
 
                                                     random_id=True,
410
 
                                                     reuse_blocks=False):
 
409
            for _, _ in target_vf._insert_record_stream(
 
410
                    stream, random_id=True, reuse_blocks=False):
411
411
                pass
412
 
        finally:
413
 
            child_pb.finished()
414
412
 
415
413
    def _copy_revision_texts(self):
416
414
        source_vf, target_vf = self._build_vfs('revision', True, False)
428
426
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
429
427
        # than this.
430
428
        inventory_keys = source_vf.keys()
431
 
        missing_inventories = set(self.revision_keys).difference(inventory_keys)
 
429
        missing_inventories = set(
 
430
            self.revision_keys).difference(inventory_keys)
432
431
        if missing_inventories:
433
432
            # Go back to the original repo, to see if these are really missing
434
433
            # https://bugs.launchpad.net/bzr/+bug/437003
441
440
            if really_missing:
442
441
                missing_inventories = sorted(really_missing)
443
442
                raise ValueError('We are missing inventories for revisions: %s'
444
 
                    % (missing_inventories,))
 
443
                                 % (missing_inventories,))
445
444
        self._copy_stream(source_vf, target_vf, inventory_keys,
446
445
                          'inventories', self._get_filtered_inv_stream, 2)
447
446
 
458
457
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
459
458
                     len(total_keys))
460
459
        self.pb.update('repacking chk', 3)
461
 
        child_pb = ui.ui_factory.nested_progress_bar()
462
 
        try:
 
460
        with ui.ui_factory.nested_progress_bar() as child_pb:
463
461
            for stream in self._get_chk_streams(source_vf, total_keys,
464
462
                                                pb=child_pb):
465
 
                for _ in target_vf._insert_record_stream(stream,
466
 
                                                         random_id=True,
467
 
                                                         reuse_blocks=False):
 
463
                for _, _ in target_vf._insert_record_stream(
 
464
                        stream, random_id=True, reuse_blocks=False):
468
465
                    pass
469
 
        finally:
470
 
            child_pb.finished()
471
466
 
472
467
    def _copy_text_texts(self):
473
468
        source_vf, target_vf = self._build_vfs('text', True, True)
491
486
        self.pb.update('repacking', 0, 7)
492
487
        self.new_pack = self.open_pack()
493
488
        # Is this necessary for GC ?
494
 
        self.new_pack.set_write_cache_size(1024*1024)
 
489
        self.new_pack.set_write_cache_size(1024 * 1024)
495
490
        self._copy_revision_texts()
496
491
        self._copy_inventory_texts()
497
492
        self._copy_chk_texts()
507
502
            if old_pack.name == self.new_pack._hash.hexdigest():
508
503
                # The single old pack was already optimally packed.
509
504
                trace.mutter('single pack %s was already optimally packed',
510
 
                    old_pack.name)
 
505
                             old_pack.name)
511
506
                self.new_pack.abort()
512
507
                return None
513
508
        self.pb.update('finishing repack', 6, 7)
548
543
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
549
544
        # Strip keys back into revision_ids.
550
545
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
551
 
                         for k, parents in viewitems(ancestor_keys))
 
546
                         for k, parents in ancestor_keys.items())
552
547
        del ancestor_keys
553
548
        # TODO: _generate_text_key_index should be much cheaper to generate from
554
549
        #       a chk repository, rather than the current implementation
557
552
        # 2) generate a keys list that contains all the entries that can
558
553
        #    be used as-is, with corrected parents.
559
554
        ok_keys = []
560
 
        new_parent_keys = {} # (key, parent_keys)
 
555
        new_parent_keys = {}  # (key, parent_keys)
561
556
        discarded_keys = []
562
557
        NULL_REVISION = _mod_revision.NULL_REVISION
563
558
        for key in self._text_refs:
587
582
        del ideal_index
588
583
        del file_id_parent_map
589
584
        # 3) bulk copy the data, updating records than need it
 
585
 
590
586
        def _update_parents_for_texts():
591
587
            stream = source_vf.get_record_stream(self._text_refs,
592
 
                'groupcompress', False)
 
588
                                                 'groupcompress', False)
593
589
            for record in stream:
594
590
                if record.key in new_parent_keys:
595
591
                    record.parents = new_parent_keys[record.key]
603
599
 
604
600
class GCCHKCanonicalizingPacker(GCCHKPacker):
605
601
    """A packer that ensures inventories have canonical-form CHK maps.
606
 
    
 
602
 
607
603
    Ideally this would be part of reconcile, but it's very slow and rarely
608
604
    needed.  (It repairs repositories affected by
609
605
    https://bugs.launchpad.net/bzr/+bug/522637).
619
615
        This is useful to get the side-effects of generating a stream.
620
616
        """
621
617
        self.pb.update('scanning %s' % (message,), pb_offset)
622
 
        child_pb = ui.ui_factory.nested_progress_bar()
623
 
        try:
 
618
        with ui.ui_factory.nested_progress_bar() as child_pb:
624
619
            list(vf_to_stream(source_vf, keys, message, child_pb))
625
 
        finally:
626
 
            child_pb.finished()
627
620
 
628
621
    def _copy_inventory_texts(self):
629
622
        source_vf, target_vf = self._build_vfs('inventory', True, True)
632
625
        # First, copy the existing CHKs on the assumption that most of them
633
626
        # will be correct.  This will save us from having to reinsert (and
634
627
        # recompress) these records later at the cost of perhaps preserving a
635
 
        # few unused CHKs. 
 
628
        # few unused CHKs.
636
629
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
637
630
        # variables needed by GCCHKPacker._copy_chk_texts.)
638
631
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
639
 
                self._get_filtered_inv_stream, 2)
 
632
                             self._get_filtered_inv_stream, 2)
640
633
        GCCHKPacker._copy_chk_texts(self)
641
634
        # Now copy and fix the inventories, and any regenerated CHKs.
 
635
 
642
636
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
643
637
            return self._get_filtered_canonicalizing_inv_stream(
644
638
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
650
644
        pass
651
645
 
652
646
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
653
 
            pb=None, source_chk_vf=None, target_chk_vf=None):
 
647
                                                pb=None, source_chk_vf=None, target_chk_vf=None):
654
648
        """Filter the texts of inventories, regenerating CHKs to make sure they
655
649
        are canonical.
656
650
        """
657
651
        total_keys = len(keys)
658
652
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
 
653
 
659
654
        def _filtered_inv_stream():
660
655
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
661
656
            search_key_name = None
662
657
            for idx, record in enumerate(stream):
663
658
                # Inventories should always be with revisions; assume success.
664
 
                bytes = record.get_bytes_as('fulltext')
 
659
                lines = record.get_bytes_as('lines')
665
660
                chk_inv = inventory.CHKInventory.deserialise(
666
 
                    source_chk_vf, bytes, record.key)
 
661
                    source_chk_vf, lines, record.key)
667
662
                if pb is not None:
668
663
                    pb.update('inv', idx, total_keys)
669
664
                chk_inv.id_to_entry._ensure_root()
670
665
                if search_key_name is None:
671
666
                    # Find the name corresponding to the search_key_func
672
667
                    search_key_reg = chk_map.search_key_registry
673
 
                    for search_key_name, func in viewitems(search_key_reg):
 
668
                    for search_key_name, func in search_key_reg.items():
674
669
                        if func == chk_inv.id_to_entry._search_key_func:
675
670
                            break
676
671
                canonical_inv = inventory.CHKInventory.from_inventory(
681
676
                    trace.mutter(
682
677
                        'Non-canonical CHK map for id_to_entry of inv: %s '
683
678
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
684
 
                        chk_inv.id_to_entry.key()[0],
685
 
                        canonical_inv.id_to_entry.key()[0]))
 
679
                                                        chk_inv.id_to_entry.key()[
 
680
                                                            0],
 
681
                                                        canonical_inv.id_to_entry.key()[0]))
686
682
                    self._data_changed = True
687
683
                p_id_map = chk_inv.parent_id_basename_to_file_id
688
684
                p_id_map._ensure_root()
694
690
                        % (chk_inv.revision_id, p_id_map.key()[0],
695
691
                           canon_p_id_map.key()[0]))
696
692
                    self._data_changed = True
697
 
                yield versionedfile.ChunkedContentFactory(record.key,
698
 
                        record.parents, record.sha1,
699
 
                        canonical_inv.to_lines())
 
693
                yield versionedfile.ChunkedContentFactory(
 
694
                    record.key, record.parents, record.sha1, canonical_inv.to_lines(),
 
695
                    chunks_are_lines=True)
700
696
            # We have finished processing all of the inventory records, we
701
697
            # don't need these sets anymore
702
698
        return _filtered_inv_stream()
740
736
        missing_corresponding.difference_update(corresponding_invs)
741
737
        if missing_corresponding:
742
738
            problems.append("inventories missing for revisions %s" %
743
 
                (sorted(missing_corresponding),))
 
739
                            (sorted(missing_corresponding),))
744
740
            return problems
745
741
        # Are any chk root entries missing for any inventories?  This includes
746
742
        # any present parent inventories, which may be used when calculating
747
743
        # deltas for streaming.
748
744
        all_inv_keys = set(corresponding_invs)
749
 
        for parent_inv_keys in viewvalues(inv_parent_map):
 
745
        for parent_inv_keys in inv_parent_map.values():
750
746
            all_inv_keys.update(parent_inv_keys)
751
747
        # Filter out ghost parents.
752
748
        all_inv_keys.intersection_update(
799
795
        missing_text_keys = text_keys.difference(present_text_keys)
800
796
        if missing_text_keys:
801
797
            problems.append("missing text keys: %r"
802
 
                % (sorted(missing_text_keys),))
 
798
                            % (sorted(missing_text_keys),))
803
799
        return problems
804
800
 
805
801
 
807
803
    """subclass of PackRepository that uses CHK based inventories."""
808
804
 
809
805
    def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
810
 
        _serializer):
 
806
                 _serializer):
811
807
        """Overridden to change pack collection class."""
812
808
        super(CHKInventoryRepository, self).__init__(_format, a_controldir,
813
 
            control_files, _commit_builder_class, _serializer)
 
809
                                                     control_files, _commit_builder_class, _serializer)
814
810
        index_transport = self._transport.clone('indices')
815
811
        self._pack_collection = GCRepositoryPackCollection(self,
816
 
            self._transport, index_transport,
817
 
            self._transport.clone('upload'),
818
 
            self._transport.clone('packs'),
819
 
            _format.index_builder_class,
820
 
            _format.index_class,
821
 
            use_chk_index=self._format.supports_chks,
822
 
            )
 
812
                                                           self._transport, index_transport,
 
813
                                                           self._transport.clone(
 
814
                                                               'upload'),
 
815
                                                           self._transport.clone(
 
816
                                                               'packs'),
 
817
                                                           _format.index_builder_class,
 
818
                                                           _format.index_class,
 
819
                                                           use_chk_index=self._format.supports_chks,
 
820
                                                           )
823
821
        self.inventories = GroupCompressVersionedFiles(
824
822
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
825
 
                add_callback=self._pack_collection.inventory_index.add_callback,
826
 
                parents=True, is_locked=self.is_locked,
827
 
                inconsistency_fatal=False),
 
823
                          add_callback=self._pack_collection.inventory_index.add_callback,
 
824
                          parents=True, is_locked=self.is_locked,
 
825
                          inconsistency_fatal=False),
828
826
            access=self._pack_collection.inventory_index.data_access)
829
827
        self.revisions = GroupCompressVersionedFiles(
830
828
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
831
 
                add_callback=self._pack_collection.revision_index.add_callback,
832
 
                parents=True, is_locked=self.is_locked,
833
 
                track_external_parent_refs=True, track_new_keys=True),
 
829
                          add_callback=self._pack_collection.revision_index.add_callback,
 
830
                          parents=True, is_locked=self.is_locked,
 
831
                          track_external_parent_refs=True, track_new_keys=True),
834
832
            access=self._pack_collection.revision_index.data_access,
835
833
            delta=False)
836
834
        self.signatures = GroupCompressVersionedFiles(
837
835
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
838
 
                add_callback=self._pack_collection.signature_index.add_callback,
839
 
                parents=False, is_locked=self.is_locked,
840
 
                inconsistency_fatal=False),
 
836
                          add_callback=self._pack_collection.signature_index.add_callback,
 
837
                          parents=False, is_locked=self.is_locked,
 
838
                          inconsistency_fatal=False),
841
839
            access=self._pack_collection.signature_index.data_access,
842
840
            delta=False)
843
841
        self.texts = GroupCompressVersionedFiles(
844
842
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
845
 
                add_callback=self._pack_collection.text_index.add_callback,
846
 
                parents=True, is_locked=self.is_locked,
847
 
                inconsistency_fatal=False),
 
843
                          add_callback=self._pack_collection.text_index.add_callback,
 
844
                          parents=True, is_locked=self.is_locked,
 
845
                          inconsistency_fatal=False),
848
846
            access=self._pack_collection.text_index.data_access)
849
847
        # No parents, individual CHK pages don't have specific ancestry
850
848
        self.chk_bytes = GroupCompressVersionedFiles(
851
849
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
852
 
                add_callback=self._pack_collection.chk_index.add_callback,
853
 
                parents=False, is_locked=self.is_locked,
854
 
                inconsistency_fatal=False),
 
850
                          add_callback=self._pack_collection.chk_index.add_callback,
 
851
                          parents=False, is_locked=self.is_locked,
 
852
                          inconsistency_fatal=False),
855
853
            access=self._pack_collection.chk_index.data_access)
856
854
        search_key_name = self._format._serializer.search_key_name
857
855
        search_key_func = chk_map.search_key_registry.get(search_key_name)
878
876
        # make inventory
879
877
        serializer = self._format._serializer
880
878
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
881
 
            maximum_size=serializer.maximum_size,
882
 
            search_key_name=serializer.search_key_name)
 
879
                                                       maximum_size=serializer.maximum_size,
 
880
                                                       search_key_name=serializer.search_key_name)
883
881
        inv_lines = result.to_lines()
884
882
        return self._inventory_add_lines(revision_id, parents,
885
 
            inv_lines, check_content=False)
 
883
                                         inv_lines, check_content=False)
886
884
 
887
885
    def _create_inv_from_null(self, delta, revision_id):
888
886
        """This will mutate new_inv directly.
919
917
            parent_id_basename_dict[parent_id_basename_key] = file_id
920
918
 
921
919
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
922
 
            parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
920
                                     parent_id_basename_dict, maximum_size=serializer.maximum_size)
923
921
        return new_inv
924
922
 
925
923
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
951
949
            raise AssertionError("%r not in write group" % (self,))
952
950
        _mod_revision.check_not_reserved_id(new_revision_id)
953
951
        basis_tree = None
954
 
        if basis_inv is None:
 
952
        if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
955
953
            if basis_revision_id == _mod_revision.NULL_REVISION:
956
954
                new_inv = self._create_inv_from_null(delta, new_revision_id)
957
955
                if new_inv.root_id is None:
958
956
                    raise errors.RootMissing()
959
957
                inv_lines = new_inv.to_lines()
960
958
                return self._inventory_add_lines(new_revision_id, parents,
961
 
                    inv_lines, check_content=False), new_inv
 
959
                                                 inv_lines, check_content=False), new_inv
962
960
            else:
963
961
                basis_tree = self.revision_tree(basis_revision_id)
964
962
                basis_tree.lock_read()
965
963
                basis_inv = basis_tree.root_inventory
966
964
        try:
967
965
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
968
 
                propagate_caches=propagate_caches)
 
966
                                                     propagate_caches=propagate_caches)
969
967
            inv_lines = result.to_lines()
970
968
            return self._inventory_add_lines(new_revision_id, parents,
971
 
                inv_lines, check_content=False), result
 
969
                                             inv_lines, check_content=False), result
972
970
        finally:
973
971
            if basis_tree is not None:
974
972
                basis_tree.unlock()
975
973
 
976
 
    def _deserialise_inventory(self, revision_id, bytes):
977
 
        return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
978
 
            (revision_id,))
 
974
    def _deserialise_inventory(self, revision_id, lines):
 
975
        return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
 
976
                                                  (revision_id,))
979
977
 
980
978
    def _iter_inventories(self, revision_ids, ordering):
981
979
        """Iterate over many inventory objects."""
986
984
        texts = {}
987
985
        for record in stream:
988
986
            if record.storage_kind != 'absent':
989
 
                texts[record.key] = record.get_bytes_as('fulltext')
 
987
                texts[record.key] = record.get_bytes_as('lines')
990
988
            else:
991
989
                texts[record.key] = None
992
990
        for key in keys:
993
 
            bytes = texts[key]
994
 
            if bytes is None:
 
991
            lines = texts[key]
 
992
            if lines is None:
995
993
                yield (None, key[-1])
996
994
            else:
997
995
                yield (inventory.CHKInventory.deserialise(
998
 
                    self.chk_bytes, bytes, key), key[-1])
 
996
                    self.chk_bytes, lines, key), key[-1])
999
997
 
1000
998
    def _get_inventory_xml(self, revision_id):
1001
999
        """Get serialized inventory as a string."""
1004
1002
        # it allowing _get_inventory_xml to work. Bundles currently use the
1005
1003
        # serializer directly; this also isn't ideal, but there isn't an xml
1006
1004
        # iteration interface offered at all for repositories.
1007
 
        return self._serializer.write_inventory_to_string(
 
1005
        return self._serializer.write_inventory_to_lines(
1008
1006
            self.get_inventory(revision_id))
1009
1007
 
1010
1008
    def _find_present_inventory_keys(self, revision_keys):
1025
1023
        rich_root = self.supports_rich_root()
1026
1024
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1027
1025
        file_id_revisions = {}
1028
 
        pb = ui.ui_factory.nested_progress_bar()
1029
 
        try:
 
1026
        with ui.ui_factory.nested_progress_bar() as pb:
1030
1027
            revision_keys = [(r,) for r in revision_ids]
1031
1028
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1032
1029
            # TODO: instead of using _find_present_inventory_keys, change the
1034
1031
            #       However, we only want to tolerate missing parent
1035
1032
            #       inventories, not missing inventories for revision_ids
1036
1033
            present_parent_inv_keys = self._find_present_inventory_keys(
1037
 
                                        parent_keys)
 
1034
                parent_keys)
1038
1035
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1039
1036
            inventories_to_read = set(revision_ids)
1040
1037
            inventories_to_read.update(present_parent_inv_ids)
1044
1041
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
1045
1042
            chk_bytes = self.chk_bytes
1046
1043
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1047
 
                        interesting_root_keys, uninteresting_root_keys,
1048
 
                        pb=pb):
 
1044
                                                                interesting_root_keys, uninteresting_root_keys,
 
1045
                                                                pb=pb):
1049
1046
                for name, bytes in items:
1050
1047
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1051
1048
                    # TODO: consider interning file_id, revision_id here, or
1058
1055
                        file_id_revisions[file_id].add(revision_id)
1059
1056
                    except KeyError:
1060
1057
                        file_id_revisions[file_id] = {revision_id}
1061
 
        finally:
1062
 
            pb.finished()
1063
1058
        return file_id_revisions
1064
1059
 
1065
1060
    def find_text_key_references(self):
1077
1072
        revision_keys = self.revisions.keys()
1078
1073
        result = {}
1079
1074
        rich_roots = self.supports_rich_root()
1080
 
        pb = ui.ui_factory.nested_progress_bar()
1081
 
        try:
 
1075
        with ui.ui_factory.nested_progress_bar() as pb:
1082
1076
            all_revs = self.all_revision_ids()
1083
1077
            total = len(all_revs)
1084
1078
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
1091
1085
                    if entry.revision == inv.revision_id:
1092
1086
                        result[key] = True
1093
1087
            return result
1094
 
        finally:
1095
 
            pb.finished()
1096
1088
 
1097
1089
    def reconcile_canonicalize_chks(self):
1098
1090
        """Reconcile this repository to make sure all CHKs are in canonical
1099
1091
        form.
1100
1092
        """
1101
 
        from breezy.reconcile import PackReconciler
 
1093
        from .reconcile import PackReconciler
1102
1094
        with self.lock_write():
1103
 
            reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1104
 
            reconciler.reconcile()
1105
 
            return reconciler
 
1095
            reconciler = PackReconciler(
 
1096
                self, thorough=True, canonicalize_chks=True)
 
1097
            return reconciler.reconcile()
1106
1098
 
1107
1099
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
1108
1100
        packer = GCCHKReconcilePacker(collection, packs, extension)
1141
1133
                pass
1142
1134
            parent_map = vf.get_parent_map([(revid,)])
1143
1135
            parents_according_to_index = tuple(parent[-1] for parent in
1144
 
                parent_map[(revid,)])
 
1136
                                               parent_map[(revid,)])
1145
1137
            parents_according_to_revision = tuple(revision.parent_ids)
1146
1138
            if parents_according_to_index != parents_according_to_revision:
1147
1139
                yield (revid, parents_according_to_index,
1148
 
                    parents_according_to_revision)
 
1140
                       parents_according_to_revision)
1149
1141
 
1150
1142
    def _check_for_inconsistent_revision_parents(self):
1151
1143
        inconsistencies = list(self._find_inconsistent_revision_parents())
1174
1166
        """
1175
1167
        self._chk_id_roots = []
1176
1168
        self._chk_p_id_roots = []
 
1169
 
1177
1170
        def _filtered_inv_stream():
1178
1171
            id_roots_set = set()
1179
1172
            p_id_roots_set = set()
1186
1179
                        continue
1187
1180
                    else:
1188
1181
                        raise errors.NoSuchRevision(self, record.key)
1189
 
                bytes = record.get_bytes_as('fulltext')
1190
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
1182
                lines = record.get_bytes_as('lines')
 
1183
                chk_inv = inventory.CHKInventory.deserialise(None, lines,
1191
1184
                                                             record.key)
1192
1185
                key = chk_inv.id_to_entry.key()
1193
1186
                if key not in id_roots_set:
1219
1212
            # TODO: Update Repository.iter_inventories() to add
1220
1213
            #       ignore_missing=True
1221
1214
            present_keys = self.from_repository._find_present_inventory_keys(
1222
 
                            excluded_revision_keys)
 
1215
                excluded_revision_keys)
1223
1216
            present_ids = [k[-1] for k in present_keys]
1224
1217
            uninteresting_root_keys = set()
1225
1218
            uninteresting_pid_root_keys = set()
1228
1221
                uninteresting_pid_root_keys.add(
1229
1222
                    inv.parent_id_basename_to_file_id.key())
1230
1223
        chk_bytes = self.from_repository.chk_bytes
 
1224
 
1231
1225
        def _filter_id_to_entry():
1232
1226
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1233
 
                        self._chk_id_roots, uninteresting_root_keys)
 
1227
                                                               self._chk_id_roots, uninteresting_root_keys)
1234
1228
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
1235
 
                    chk_map._bytes_to_text_key):
 
1229
                                            chk_map._bytes_to_text_key):
1236
1230
                if record is not None:
1237
1231
                    yield record
1238
1232
            # Consumed
1239
1233
            self._chk_id_roots = None
1240
1234
        yield 'chk_bytes', _filter_id_to_entry()
 
1235
 
1241
1236
        def _get_parent_id_basename_to_file_id_pages():
1242
1237
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1243
 
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1238
                                                                self._chk_p_id_roots, uninteresting_pid_root_keys):
1244
1239
                if record is not None:
1245
1240
                    yield record
1246
1241
            # Consumed
1251
1246
        # Note: We know we don't have to handle adding root keys, because both
1252
1247
        # the source and target are the identical network name.
1253
1248
        text_stream = self.from_repository.texts.get_record_stream(
1254
 
                        self._text_keys, self._text_fetch_order, False)
 
1249
            self._text_keys, self._text_fetch_order, False)
1255
1250
        return ('texts', text_stream)
1256
1251
 
1257
1252
    def get_stream(self, search):
1267
1262
                yield record
1268
1263
 
1269
1264
        revision_ids = search.get_keys()
1270
 
        pb = ui.ui_factory.nested_progress_bar()
1271
 
        rc = self._record_counter
1272
 
        self._record_counter.setup(len(revision_ids))
1273
 
        for stream_info in self._fetch_revision_texts(revision_ids):
1274
 
            yield (stream_info[0],
1275
 
                wrap_and_count(pb, rc, stream_info[1]))
1276
 
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1277
 
        # TODO: The keys to exclude might be part of the search recipe
1278
 
        # For now, exclude all parents that are at the edge of ancestry, for
1279
 
        # which we have inventories
1280
 
        from_repo = self.from_repository
1281
 
        parent_keys = from_repo._find_parent_keys_of_revisions(
1282
 
                        self._revision_keys)
1283
 
        self.from_repository.revisions.clear_cache()
1284
 
        self.from_repository.signatures.clear_cache()
1285
 
        # Clear the repo's get_parent_map cache too.
1286
 
        self.from_repository._unstacked_provider.disable_cache()
1287
 
        self.from_repository._unstacked_provider.enable_cache()
1288
 
        s = self._get_inventory_stream(self._revision_keys)
1289
 
        yield (s[0], wrap_and_count(pb, rc, s[1]))
1290
 
        self.from_repository.inventories.clear_cache()
1291
 
        for stream_info in self._get_filtered_chk_streams(parent_keys):
1292
 
            yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1293
 
        self.from_repository.chk_bytes.clear_cache()
1294
 
        s = self._get_text_stream()
1295
 
        yield (s[0], wrap_and_count(pb, rc, s[1]))
1296
 
        self.from_repository.texts.clear_cache()
1297
 
        pb.update('Done', rc.max, rc.max)
1298
 
        pb.finished()
 
1265
        with ui.ui_factory.nested_progress_bar() as pb:
 
1266
            rc = self._record_counter
 
1267
            self._record_counter.setup(len(revision_ids))
 
1268
            for stream_info in self._fetch_revision_texts(revision_ids):
 
1269
                yield (stream_info[0],
 
1270
                       wrap_and_count(pb, rc, stream_info[1]))
 
1271
            self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1272
            # TODO: The keys to exclude might be part of the search recipe
 
1273
            # For now, exclude all parents that are at the edge of ancestry, for
 
1274
            # which we have inventories
 
1275
            from_repo = self.from_repository
 
1276
            parent_keys = from_repo._find_parent_keys_of_revisions(
 
1277
                self._revision_keys)
 
1278
            self.from_repository.revisions.clear_cache()
 
1279
            self.from_repository.signatures.clear_cache()
 
1280
            # Clear the repo's get_parent_map cache too.
 
1281
            self.from_repository._unstacked_provider.disable_cache()
 
1282
            self.from_repository._unstacked_provider.enable_cache()
 
1283
            s = self._get_inventory_stream(self._revision_keys)
 
1284
            yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1285
            self.from_repository.inventories.clear_cache()
 
1286
            for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1287
                yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1288
            self.from_repository.chk_bytes.clear_cache()
 
1289
            s = self._get_text_stream()
 
1290
            yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1291
            self.from_repository.texts.clear_cache()
 
1292
            pb.update('Done', rc.max, rc.max)
1299
1293
 
1300
1294
    def get_stream_for_missing_keys(self, missing_keys):
1301
1295
        # missing keys can only occur when we are byte copying and not
1305
1299
        for key in missing_keys:
1306
1300
            if key[0] != 'inventories':
1307
1301
                raise AssertionError('The only missing keys we should'
1308
 
                    ' be filling in are inventory keys, not %s'
1309
 
                    % (key[0],))
 
1302
                                     ' be filling in are inventory keys, not %s'
 
1303
                                     % (key[0],))
1310
1304
            missing_inventory_keys.add(key[1:])
1311
1305
        if self._chk_id_roots or self._chk_p_id_roots:
1312
1306
            raise AssertionError('Cannot call get_stream_for_missing_keys'
1313
 
                ' until all of get_stream() has been consumed.')
 
1307
                                 ' until all of get_stream() has been consumed.')
1314
1308
        # Yield the inventory stream, so we can find the chk stream
1315
1309
        # Some of the missing_keys will be missing because they are ghosts.
1316
1310
        # As such, we can ignore them. The Sink is required to verify there are
1371
1365
    repository_class = CHKInventoryRepository
1372
1366
    supports_external_lookups = True
1373
1367
    supports_chks = True
1374
 
    _commit_builder_class = PackRootCommitBuilder
 
1368
    _commit_builder_class = PackCommitBuilder
1375
1369
    rich_root_data = True
1376
1370
    _serializer = chk_serializer.chk_bencode_serializer
1377
1371
    _commit_inv_deltas = True
1385
1379
    # multiple in-a-row (and sharing strings). Topological is better
1386
1380
    # for remote, because we access less data.
1387
1381
    _fetch_order = 'unordered'
1388
 
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
 
1382
    # essentially ignored by the groupcompress code.
 
1383
    _fetch_uses_deltas = False
1389
1384
    fast_deltas = True
1390
1385
    pack_compresses = True
 
1386
    supports_tree_reference = True
1391
1387
 
1392
1388
    def _get_matching_bzrdir(self):
1393
1389
        return controldir.format_registry.make_controldir('2a')
1395
1391
    def _ignore_setting_bzrdir(self, format):
1396
1392
        pass
1397
1393
 
1398
 
    _matchingcontroldir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1394
    _matchingcontroldir = property(
 
1395
        _get_matching_bzrdir, _ignore_setting_bzrdir)
1399
1396
 
1400
1397
    @classmethod
1401
1398
    def get_format_string(cls):
1402
 
        return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
 
1399
        return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
1403
1400
 
1404
1401
    def get_format_description(self):
1405
1402
        """See RepositoryFormat.get_format_description()."""
1406
1403
        return ("Repository format 2a - rich roots, group compression"
1407
 
            " and chk inventories")
 
1404
                " and chk inventories")
1408
1405
 
1409
1406
 
1410
1407
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1418
1415
    def _ignore_setting_bzrdir(self, format):
1419
1416
        pass
1420
1417
 
1421
 
    _matchingcontroldir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1418
    _matchingcontroldir = property(
 
1419
        _get_matching_bzrdir, _ignore_setting_bzrdir)
1422
1420
 
1423
1421
    @classmethod
1424
1422
    def get_format_string(cls):
1425
 
        return ('Bazaar development format 8\n')
 
1423
        return b'Bazaar development format 8\n'
1426
1424
 
1427
1425
    def get_format_description(self):
1428
1426
        """See RepositoryFormat.get_format_description()."""