/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/repofmt/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Repository formats using CHK inventories and groupcompress compression."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
 
21
23
from .. import (
39
41
    BTreeGraphIndex,
40
42
    BTreeBuilder,
41
43
    )
 
44
from ..decorators import needs_write_lock
42
45
from ..bzr.groupcompress import (
43
46
    _GCGraphIndex,
44
47
    GroupCompressVersionedFiles,
48
51
    Pack,
49
52
    NewPack,
50
53
    PackRepository,
51
 
    PackCommitBuilder,
 
54
    PackRootCommitBuilder,
52
55
    RepositoryPackCollection,
53
56
    RepositoryFormatPack,
54
57
    ResumedPack,
57
60
from ..bzr.vf_repository import (
58
61
    StreamSource,
59
62
    )
 
63
from ..sixish import (
 
64
    viewitems,
 
65
    viewvalues,
 
66
    )
60
67
from ..static_tuple import StaticTuple
61
68
 
62
69
 
85
92
        else:
86
93
            chk_index = None
87
94
        Pack.__init__(self,
88
 
                      # Revisions: parents list, no text compression.
89
 
                      index_builder_class(reference_lists=1),
90
 
                      # Inventory: We want to map compression only, but currently the
91
 
                      # knit code hasn't been updated enough to understand that, so we
92
 
                      # have a regular 2-list index giving parents and compression
93
 
                      # source.
94
 
                      index_builder_class(reference_lists=1),
95
 
                      # Texts: per file graph, for all fileids - so one reference list
96
 
                      # and two elements in the key tuple.
97
 
                      index_builder_class(reference_lists=1, key_elements=2),
98
 
                      # Signatures: Just blobs to store, no compression, no parents
99
 
                      # listing.
100
 
                      index_builder_class(reference_lists=0),
101
 
                      # CHK based storage - just blobs, no compression or parents.
102
 
                      chk_index=chk_index
103
 
                      )
 
95
            # Revisions: parents list, no text compression.
 
96
            index_builder_class(reference_lists=1),
 
97
            # Inventory: We want to map compression only, but currently the
 
98
            # knit code hasn't been updated enough to understand that, so we
 
99
            # have a regular 2-list index giving parents and compression
 
100
            # source.
 
101
            index_builder_class(reference_lists=1),
 
102
            # Texts: per file graph, for all fileids - so one reference list
 
103
            # and two elements in the key tuple.
 
104
            index_builder_class(reference_lists=1, key_elements=2),
 
105
            # Signatures: Just blobs to store, no compression, no parents
 
106
            # listing.
 
107
            index_builder_class(reference_lists=0),
 
108
            # CHK based storage - just blobs, no compression or parents.
 
109
            chk_index=chk_index
 
110
            )
104
111
        self._pack_collection = pack_collection
105
112
        # When we make readonly indices, we need this.
106
113
        self.index_class = pack_collection._index_class
131
138
            self.random_name, mode=self._file_mode)
132
139
        if 'pack' in debug.debug_flags:
133
140
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
134
 
                         time.ctime(), self.upload_transport.base, self.random_name,
135
 
                         time.time() - self.start_time)
 
141
                time.ctime(), self.upload_transport.base, self.random_name,
 
142
                time.time() - self.start_time)
136
143
        # A list of byte sequences to be written to the new pack, and the
137
144
        # aggregate size of them.  Stored as a list rather than separate
138
145
        # variables so that the _write_data closure below can update them.
142
149
        # robertc says- this is a closure rather than a method on the object
143
150
        # so that the variables are locals, and faster than accessing object
144
151
        # members.
145
 
 
146
 
        def _write_data(data, flush=False, _buffer=self._buffer,
147
 
                        _write=self.write_stream.write, _update=self._hash.update):
148
 
            _buffer[0].append(data)
149
 
            _buffer[1] += len(data)
 
152
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
153
            _write=self.write_stream.write, _update=self._hash.update):
 
154
            _buffer[0].append(bytes)
 
155
            _buffer[1] += len(bytes)
150
156
            # buffer cap
151
157
            if _buffer[1] > self._cache_limit or flush:
152
 
                data = b''.join(_buffer[0])
153
 
                _write(data)
154
 
                _update(data)
 
158
                bytes = ''.join(_buffer[0])
 
159
                _write(bytes)
 
160
                _update(bytes)
155
161
                _buffer[:] = [[], 0]
156
162
        # expose this on self, for the occasion when clients want to add data.
157
163
        self._write_data = _write_data
199
205
        self._pack_collection = pack_collection
200
206
        # ATM, We only support this for GCCHK repositories
201
207
        if pack_collection.chk_index is None:
202
 
            raise AssertionError(
203
 
                'pack_collection.chk_index should not be None')
 
208
            raise AssertionError('pack_collection.chk_index should not be None')
204
209
        self._gather_text_refs = False
205
210
        self._chk_id_roots = []
206
211
        self._chk_p_id_roots = []
210
215
 
211
216
    def _get_progress_stream(self, source_vf, keys, message, pb):
212
217
        def pb_stream():
213
 
            substream = source_vf.get_record_stream(
214
 
                keys, 'groupcompress', True)
 
218
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
215
219
            for idx, record in enumerate(substream):
216
220
                if pb is not None:
217
221
                    pb.update(message, idx + 1, len(keys))
221
225
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
222
226
        """Filter the texts of inventories, to find the chk pages."""
223
227
        total_keys = len(keys)
224
 
 
225
228
        def _filtered_inv_stream():
226
229
            id_roots_set = set()
227
230
            p_id_roots_set = set()
228
231
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
229
232
            for idx, record in enumerate(stream):
230
233
                # Inventories should always be with revisions; assume success.
231
 
                lines = record.get_bytes_as('lines')
232
 
                chk_inv = inventory.CHKInventory.deserialise(
233
 
                    None, lines, record.key)
 
234
                bytes = record.get_bytes_as('fulltext')
 
235
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
236
                                                             record.key)
234
237
                if pb is not None:
235
238
                    pb.update('inv', idx, total_keys)
236
239
                key = chk_inv.id_to_entry.key()
272
275
        counter = [0]
273
276
        if self._gather_text_refs:
274
277
            self._text_refs = set()
275
 
 
276
278
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
277
279
            cur_keys = root_keys
278
280
            while cur_keys:
279
281
                keys_by_search_prefix = {}
280
282
                remaining_keys.difference_update(cur_keys)
281
283
                next_keys = set()
282
 
 
283
284
                def handle_internal_node(node):
284
 
                    for prefix, value in node._items.items():
 
285
                    for prefix, value in viewitems(node._items):
285
286
                        # We don't want to request the same key twice, and we
286
287
                        # want to order it by the first time it is seen.
287
288
                        # Even further, we don't want to request a key which is
293
294
                        #       always fill them in for stacked branches
294
295
                        if value not in next_keys and value in remaining_keys:
295
296
                            keys_by_search_prefix.setdefault(prefix,
296
 
                                                             []).append(value)
 
297
                                []).append(value)
297
298
                            next_keys.add(value)
298
 
 
299
299
                def handle_leaf_node(node):
300
300
                    # Store is None, because we know we have a LeafNode, and we
301
301
                    # just want its entries
302
302
                    for file_id, bytes in node.iteritems(None):
303
303
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
304
 
 
305
304
                def next_stream():
306
305
                    stream = source_vf.get_record_stream(cur_keys,
307
306
                                                         'as-requested', True)
404
403
                     pb_offset):
405
404
        trace.mutter('repacking %d %s', len(keys), message)
406
405
        self.pb.update('repacking %s' % (message,), pb_offset)
407
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
406
        child_pb = ui.ui_factory.nested_progress_bar()
 
407
        try:
408
408
            stream = vf_to_stream(source_vf, keys, message, child_pb)
409
 
            for _, _ in target_vf._insert_record_stream(
410
 
                    stream, random_id=True, reuse_blocks=False):
 
409
            for _ in target_vf._insert_record_stream(stream,
 
410
                                                     random_id=True,
 
411
                                                     reuse_blocks=False):
411
412
                pass
 
413
        finally:
 
414
            child_pb.finished()
412
415
 
413
416
    def _copy_revision_texts(self):
414
417
        source_vf, target_vf = self._build_vfs('revision', True, False)
426
429
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
427
430
        # than this.
428
431
        inventory_keys = source_vf.keys()
429
 
        missing_inventories = set(
430
 
            self.revision_keys).difference(inventory_keys)
 
432
        missing_inventories = set(self.revision_keys).difference(inventory_keys)
431
433
        if missing_inventories:
432
434
            # Go back to the original repo, to see if these are really missing
433
435
            # https://bugs.launchpad.net/bzr/+bug/437003
440
442
            if really_missing:
441
443
                missing_inventories = sorted(really_missing)
442
444
                raise ValueError('We are missing inventories for revisions: %s'
443
 
                                 % (missing_inventories,))
 
445
                    % (missing_inventories,))
444
446
        self._copy_stream(source_vf, target_vf, inventory_keys,
445
447
                          'inventories', self._get_filtered_inv_stream, 2)
446
448
 
457
459
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
458
460
                     len(total_keys))
459
461
        self.pb.update('repacking chk', 3)
460
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
462
        child_pb = ui.ui_factory.nested_progress_bar()
 
463
        try:
461
464
            for stream in self._get_chk_streams(source_vf, total_keys,
462
465
                                                pb=child_pb):
463
 
                for _, _ in target_vf._insert_record_stream(
464
 
                        stream, random_id=True, reuse_blocks=False):
 
466
                for _ in target_vf._insert_record_stream(stream,
 
467
                                                         random_id=True,
 
468
                                                         reuse_blocks=False):
465
469
                    pass
 
470
        finally:
 
471
            child_pb.finished()
466
472
 
467
473
    def _copy_text_texts(self):
468
474
        source_vf, target_vf = self._build_vfs('text', True, True)
486
492
        self.pb.update('repacking', 0, 7)
487
493
        self.new_pack = self.open_pack()
488
494
        # Is this necessary for GC ?
489
 
        self.new_pack.set_write_cache_size(1024 * 1024)
 
495
        self.new_pack.set_write_cache_size(1024*1024)
490
496
        self._copy_revision_texts()
491
497
        self._copy_inventory_texts()
492
498
        self._copy_chk_texts()
502
508
            if old_pack.name == self.new_pack._hash.hexdigest():
503
509
                # The single old pack was already optimally packed.
504
510
                trace.mutter('single pack %s was already optimally packed',
505
 
                             old_pack.name)
 
511
                    old_pack.name)
506
512
                self.new_pack.abort()
507
513
                return None
508
514
        self.pb.update('finishing repack', 6, 7)
543
549
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
544
550
        # Strip keys back into revision_ids.
545
551
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
546
 
                         for k, parents in ancestor_keys.items())
 
552
                         for k, parents in viewitems(ancestor_keys))
547
553
        del ancestor_keys
548
554
        # TODO: _generate_text_key_index should be much cheaper to generate from
549
555
        #       a chk repository, rather than the current implementation
552
558
        # 2) generate a keys list that contains all the entries that can
553
559
        #    be used as-is, with corrected parents.
554
560
        ok_keys = []
555
 
        new_parent_keys = {}  # (key, parent_keys)
 
561
        new_parent_keys = {} # (key, parent_keys)
556
562
        discarded_keys = []
557
563
        NULL_REVISION = _mod_revision.NULL_REVISION
558
564
        for key in self._text_refs:
582
588
        del ideal_index
583
589
        del file_id_parent_map
584
590
        # 3) bulk copy the data, updating records than need it
585
 
 
586
591
        def _update_parents_for_texts():
587
592
            stream = source_vf.get_record_stream(self._text_refs,
588
 
                                                 'groupcompress', False)
 
593
                'groupcompress', False)
589
594
            for record in stream:
590
595
                if record.key in new_parent_keys:
591
596
                    record.parents = new_parent_keys[record.key]
599
604
 
600
605
class GCCHKCanonicalizingPacker(GCCHKPacker):
601
606
    """A packer that ensures inventories have canonical-form CHK maps.
602
 
 
 
607
    
603
608
    Ideally this would be part of reconcile, but it's very slow and rarely
604
609
    needed.  (It repairs repositories affected by
605
610
    https://bugs.launchpad.net/bzr/+bug/522637).
615
620
        This is useful to get the side-effects of generating a stream.
616
621
        """
617
622
        self.pb.update('scanning %s' % (message,), pb_offset)
618
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
623
        child_pb = ui.ui_factory.nested_progress_bar()
 
624
        try:
619
625
            list(vf_to_stream(source_vf, keys, message, child_pb))
 
626
        finally:
 
627
            child_pb.finished()
620
628
 
621
629
    def _copy_inventory_texts(self):
622
630
        source_vf, target_vf = self._build_vfs('inventory', True, True)
625
633
        # First, copy the existing CHKs on the assumption that most of them
626
634
        # will be correct.  This will save us from having to reinsert (and
627
635
        # recompress) these records later at the cost of perhaps preserving a
628
 
        # few unused CHKs.
 
636
        # few unused CHKs. 
629
637
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
630
638
        # variables needed by GCCHKPacker._copy_chk_texts.)
631
639
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
632
 
                             self._get_filtered_inv_stream, 2)
 
640
                self._get_filtered_inv_stream, 2)
633
641
        GCCHKPacker._copy_chk_texts(self)
634
642
        # Now copy and fix the inventories, and any regenerated CHKs.
635
 
 
636
643
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
637
644
            return self._get_filtered_canonicalizing_inv_stream(
638
645
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
644
651
        pass
645
652
 
646
653
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
647
 
                                                pb=None, source_chk_vf=None, target_chk_vf=None):
 
654
            pb=None, source_chk_vf=None, target_chk_vf=None):
648
655
        """Filter the texts of inventories, regenerating CHKs to make sure they
649
656
        are canonical.
650
657
        """
651
658
        total_keys = len(keys)
652
659
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
653
 
 
654
660
        def _filtered_inv_stream():
655
661
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
656
662
            search_key_name = None
657
663
            for idx, record in enumerate(stream):
658
664
                # Inventories should always be with revisions; assume success.
659
 
                lines = record.get_bytes_as('lines')
 
665
                bytes = record.get_bytes_as('fulltext')
660
666
                chk_inv = inventory.CHKInventory.deserialise(
661
 
                    source_chk_vf, lines, record.key)
 
667
                    source_chk_vf, bytes, record.key)
662
668
                if pb is not None:
663
669
                    pb.update('inv', idx, total_keys)
664
670
                chk_inv.id_to_entry._ensure_root()
665
671
                if search_key_name is None:
666
672
                    # Find the name corresponding to the search_key_func
667
673
                    search_key_reg = chk_map.search_key_registry
668
 
                    for search_key_name, func in search_key_reg.items():
 
674
                    for search_key_name, func in viewitems(search_key_reg):
669
675
                        if func == chk_inv.id_to_entry._search_key_func:
670
676
                            break
671
677
                canonical_inv = inventory.CHKInventory.from_inventory(
676
682
                    trace.mutter(
677
683
                        'Non-canonical CHK map for id_to_entry of inv: %s '
678
684
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
679
 
                                                        chk_inv.id_to_entry.key()[
680
 
                                                            0],
681
 
                                                        canonical_inv.id_to_entry.key()[0]))
 
685
                        chk_inv.id_to_entry.key()[0],
 
686
                        canonical_inv.id_to_entry.key()[0]))
682
687
                    self._data_changed = True
683
688
                p_id_map = chk_inv.parent_id_basename_to_file_id
684
689
                p_id_map._ensure_root()
690
695
                        % (chk_inv.revision_id, p_id_map.key()[0],
691
696
                           canon_p_id_map.key()[0]))
692
697
                    self._data_changed = True
693
 
                yield versionedfile.ChunkedContentFactory(
694
 
                    record.key, record.parents, record.sha1, canonical_inv.to_lines(),
695
 
                    chunks_are_lines=True)
 
698
                yield versionedfile.ChunkedContentFactory(record.key,
 
699
                        record.parents, record.sha1,
 
700
                        canonical_inv.to_lines())
696
701
            # We have finished processing all of the inventory records, we
697
702
            # don't need these sets anymore
698
703
        return _filtered_inv_stream()
736
741
        missing_corresponding.difference_update(corresponding_invs)
737
742
        if missing_corresponding:
738
743
            problems.append("inventories missing for revisions %s" %
739
 
                            (sorted(missing_corresponding),))
 
744
                (sorted(missing_corresponding),))
740
745
            return problems
741
746
        # Are any chk root entries missing for any inventories?  This includes
742
747
        # any present parent inventories, which may be used when calculating
743
748
        # deltas for streaming.
744
749
        all_inv_keys = set(corresponding_invs)
745
 
        for parent_inv_keys in inv_parent_map.values():
 
750
        for parent_inv_keys in viewvalues(inv_parent_map):
746
751
            all_inv_keys.update(parent_inv_keys)
747
752
        # Filter out ghost parents.
748
753
        all_inv_keys.intersection_update(
795
800
        missing_text_keys = text_keys.difference(present_text_keys)
796
801
        if missing_text_keys:
797
802
            problems.append("missing text keys: %r"
798
 
                            % (sorted(missing_text_keys),))
 
803
                % (sorted(missing_text_keys),))
799
804
        return problems
800
805
 
801
806
 
802
807
class CHKInventoryRepository(PackRepository):
803
808
    """subclass of PackRepository that uses CHK based inventories."""
804
809
 
805
 
    def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
806
 
                 _serializer):
 
810
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
811
        _serializer):
807
812
        """Overridden to change pack collection class."""
808
 
        super(CHKInventoryRepository, self).__init__(_format, a_controldir,
809
 
                                                     control_files, _commit_builder_class, _serializer)
 
813
        super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
 
814
            control_files, _commit_builder_class, _serializer)
810
815
        index_transport = self._transport.clone('indices')
811
816
        self._pack_collection = GCRepositoryPackCollection(self,
812
 
                                                           self._transport, index_transport,
813
 
                                                           self._transport.clone(
814
 
                                                               'upload'),
815
 
                                                           self._transport.clone(
816
 
                                                               'packs'),
817
 
                                                           _format.index_builder_class,
818
 
                                                           _format.index_class,
819
 
                                                           use_chk_index=self._format.supports_chks,
820
 
                                                           )
 
817
            self._transport, index_transport,
 
818
            self._transport.clone('upload'),
 
819
            self._transport.clone('packs'),
 
820
            _format.index_builder_class,
 
821
            _format.index_class,
 
822
            use_chk_index=self._format.supports_chks,
 
823
            )
821
824
        self.inventories = GroupCompressVersionedFiles(
822
825
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
823
 
                          add_callback=self._pack_collection.inventory_index.add_callback,
824
 
                          parents=True, is_locked=self.is_locked,
825
 
                          inconsistency_fatal=False),
 
826
                add_callback=self._pack_collection.inventory_index.add_callback,
 
827
                parents=True, is_locked=self.is_locked,
 
828
                inconsistency_fatal=False),
826
829
            access=self._pack_collection.inventory_index.data_access)
827
830
        self.revisions = GroupCompressVersionedFiles(
828
831
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
829
 
                          add_callback=self._pack_collection.revision_index.add_callback,
830
 
                          parents=True, is_locked=self.is_locked,
831
 
                          track_external_parent_refs=True, track_new_keys=True),
 
832
                add_callback=self._pack_collection.revision_index.add_callback,
 
833
                parents=True, is_locked=self.is_locked,
 
834
                track_external_parent_refs=True, track_new_keys=True),
832
835
            access=self._pack_collection.revision_index.data_access,
833
836
            delta=False)
834
837
        self.signatures = GroupCompressVersionedFiles(
835
838
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
836
 
                          add_callback=self._pack_collection.signature_index.add_callback,
837
 
                          parents=False, is_locked=self.is_locked,
838
 
                          inconsistency_fatal=False),
 
839
                add_callback=self._pack_collection.signature_index.add_callback,
 
840
                parents=False, is_locked=self.is_locked,
 
841
                inconsistency_fatal=False),
839
842
            access=self._pack_collection.signature_index.data_access,
840
843
            delta=False)
841
844
        self.texts = GroupCompressVersionedFiles(
842
845
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
843
 
                          add_callback=self._pack_collection.text_index.add_callback,
844
 
                          parents=True, is_locked=self.is_locked,
845
 
                          inconsistency_fatal=False),
 
846
                add_callback=self._pack_collection.text_index.add_callback,
 
847
                parents=True, is_locked=self.is_locked,
 
848
                inconsistency_fatal=False),
846
849
            access=self._pack_collection.text_index.data_access)
847
850
        # No parents, individual CHK pages don't have specific ancestry
848
851
        self.chk_bytes = GroupCompressVersionedFiles(
849
852
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
850
 
                          add_callback=self._pack_collection.chk_index.add_callback,
851
 
                          parents=False, is_locked=self.is_locked,
852
 
                          inconsistency_fatal=False),
 
853
                add_callback=self._pack_collection.chk_index.add_callback,
 
854
                parents=False, is_locked=self.is_locked,
 
855
                inconsistency_fatal=False),
853
856
            access=self._pack_collection.chk_index.data_access)
854
857
        search_key_name = self._format._serializer.search_key_name
855
858
        search_key_func = chk_map.search_key_registry.get(search_key_name)
876
879
        # make inventory
877
880
        serializer = self._format._serializer
878
881
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
879
 
                                                       maximum_size=serializer.maximum_size,
880
 
                                                       search_key_name=serializer.search_key_name)
 
882
            maximum_size=serializer.maximum_size,
 
883
            search_key_name=serializer.search_key_name)
881
884
        inv_lines = result.to_lines()
882
885
        return self._inventory_add_lines(revision_id, parents,
883
 
                                         inv_lines, check_content=False)
 
886
            inv_lines, check_content=False)
884
887
 
885
888
    def _create_inv_from_null(self, delta, revision_id):
886
889
        """This will mutate new_inv directly.
904
907
                                 ' no new_path %r' % (file_id,))
905
908
            if new_path == '':
906
909
                new_inv.root_id = file_id
907
 
                parent_id_basename_key = StaticTuple(b'', b'').intern()
 
910
                parent_id_basename_key = StaticTuple('', '').intern()
908
911
            else:
909
912
                utf8_entry_name = entry.name.encode('utf-8')
910
913
                parent_id_basename_key = StaticTuple(entry.parent_id,
917
920
            parent_id_basename_dict[parent_id_basename_key] = file_id
918
921
 
919
922
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
920
 
                                     parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
923
            parent_id_basename_dict, maximum_size=serializer.maximum_size)
921
924
        return new_inv
922
925
 
923
926
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
949
952
            raise AssertionError("%r not in write group" % (self,))
950
953
        _mod_revision.check_not_reserved_id(new_revision_id)
951
954
        basis_tree = None
952
 
        if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
 
955
        if basis_inv is None:
953
956
            if basis_revision_id == _mod_revision.NULL_REVISION:
954
957
                new_inv = self._create_inv_from_null(delta, new_revision_id)
955
958
                if new_inv.root_id is None:
956
959
                    raise errors.RootMissing()
957
960
                inv_lines = new_inv.to_lines()
958
961
                return self._inventory_add_lines(new_revision_id, parents,
959
 
                                                 inv_lines, check_content=False), new_inv
 
962
                    inv_lines, check_content=False), new_inv
960
963
            else:
961
964
                basis_tree = self.revision_tree(basis_revision_id)
962
965
                basis_tree.lock_read()
963
966
                basis_inv = basis_tree.root_inventory
964
967
        try:
965
968
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
966
 
                                                     propagate_caches=propagate_caches)
 
969
                propagate_caches=propagate_caches)
967
970
            inv_lines = result.to_lines()
968
971
            return self._inventory_add_lines(new_revision_id, parents,
969
 
                                             inv_lines, check_content=False), result
 
972
                inv_lines, check_content=False), result
970
973
        finally:
971
974
            if basis_tree is not None:
972
975
                basis_tree.unlock()
973
976
 
974
 
    def _deserialise_inventory(self, revision_id, lines):
975
 
        return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
976
 
                                                  (revision_id,))
 
977
    def _deserialise_inventory(self, revision_id, bytes):
 
978
        return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
 
979
            (revision_id,))
977
980
 
978
981
    def _iter_inventories(self, revision_ids, ordering):
979
982
        """Iterate over many inventory objects."""
984
987
        texts = {}
985
988
        for record in stream:
986
989
            if record.storage_kind != 'absent':
987
 
                texts[record.key] = record.get_bytes_as('lines')
 
990
                texts[record.key] = record.get_bytes_as('fulltext')
988
991
            else:
989
992
                texts[record.key] = None
990
993
        for key in keys:
991
 
            lines = texts[key]
992
 
            if lines is None:
 
994
            bytes = texts[key]
 
995
            if bytes is None:
993
996
                yield (None, key[-1])
994
997
            else:
995
998
                yield (inventory.CHKInventory.deserialise(
996
 
                    self.chk_bytes, lines, key), key[-1])
 
999
                    self.chk_bytes, bytes, key), key[-1])
997
1000
 
998
1001
    def _get_inventory_xml(self, revision_id):
999
1002
        """Get serialized inventory as a string."""
1002
1005
        # it allowing _get_inventory_xml to work. Bundles currently use the
1003
1006
        # serializer directly; this also isn't ideal, but there isn't an xml
1004
1007
        # iteration interface offered at all for repositories.
1005
 
        return self._serializer.write_inventory_to_lines(
 
1008
        return self._serializer.write_inventory_to_string(
1006
1009
            self.get_inventory(revision_id))
1007
1010
 
1008
1011
    def _find_present_inventory_keys(self, revision_keys):
1023
1026
        rich_root = self.supports_rich_root()
1024
1027
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1025
1028
        file_id_revisions = {}
1026
 
        with ui.ui_factory.nested_progress_bar() as pb:
 
1029
        pb = ui.ui_factory.nested_progress_bar()
 
1030
        try:
1027
1031
            revision_keys = [(r,) for r in revision_ids]
1028
1032
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1029
1033
            # TODO: instead of using _find_present_inventory_keys, change the
1031
1035
            #       However, we only want to tolerate missing parent
1032
1036
            #       inventories, not missing inventories for revision_ids
1033
1037
            present_parent_inv_keys = self._find_present_inventory_keys(
1034
 
                parent_keys)
 
1038
                                        parent_keys)
1035
1039
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1036
1040
            inventories_to_read = set(revision_ids)
1037
1041
            inventories_to_read.update(present_parent_inv_ids)
1041
1045
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
1042
1046
            chk_bytes = self.chk_bytes
1043
1047
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1044
 
                                                                interesting_root_keys, uninteresting_root_keys,
1045
 
                                                                pb=pb):
 
1048
                        interesting_root_keys, uninteresting_root_keys,
 
1049
                        pb=pb):
1046
1050
                for name, bytes in items:
1047
1051
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1048
1052
                    # TODO: consider interning file_id, revision_id here, or
1055
1059
                        file_id_revisions[file_id].add(revision_id)
1056
1060
                    except KeyError:
1057
1061
                        file_id_revisions[file_id] = {revision_id}
 
1062
        finally:
 
1063
            pb.finished()
1058
1064
        return file_id_revisions
1059
1065
 
1060
1066
    def find_text_key_references(self):
1072
1078
        revision_keys = self.revisions.keys()
1073
1079
        result = {}
1074
1080
        rich_roots = self.supports_rich_root()
1075
 
        with ui.ui_factory.nested_progress_bar() as pb:
 
1081
        pb = ui.ui_factory.nested_progress_bar()
 
1082
        try:
1076
1083
            all_revs = self.all_revision_ids()
1077
1084
            total = len(all_revs)
1078
1085
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
1085
1092
                    if entry.revision == inv.revision_id:
1086
1093
                        result[key] = True
1087
1094
            return result
 
1095
        finally:
 
1096
            pb.finished()
1088
1097
 
 
1098
    @needs_write_lock
1089
1099
    def reconcile_canonicalize_chks(self):
1090
1100
        """Reconcile this repository to make sure all CHKs are in canonical
1091
1101
        form.
1092
1102
        """
1093
 
        from .reconcile import PackReconciler
1094
 
        with self.lock_write():
1095
 
            reconciler = PackReconciler(
1096
 
                self, thorough=True, canonicalize_chks=True)
1097
 
            return reconciler.reconcile()
 
1103
        from breezy.reconcile import PackReconciler
 
1104
        reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
 
1105
        reconciler.reconcile()
 
1106
        return reconciler
1098
1107
 
1099
1108
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
1100
1109
        packer = GCCHKReconcilePacker(collection, packs, extension)
1127
1136
            raise AssertionError()
1128
1137
        vf = self.revisions
1129
1138
        if revisions_iterator is None:
1130
 
            revisions_iterator = self.iter_revisions(self.all_revision_ids())
 
1139
            revisions_iterator = self._iter_revisions(None)
1131
1140
        for revid, revision in revisions_iterator:
1132
1141
            if revision is None:
1133
1142
                pass
1134
1143
            parent_map = vf.get_parent_map([(revid,)])
1135
1144
            parents_according_to_index = tuple(parent[-1] for parent in
1136
 
                                               parent_map[(revid,)])
 
1145
                parent_map[(revid,)])
1137
1146
            parents_according_to_revision = tuple(revision.parent_ids)
1138
1147
            if parents_according_to_index != parents_according_to_revision:
1139
1148
                yield (revid, parents_according_to_index,
1140
 
                       parents_according_to_revision)
 
1149
                    parents_according_to_revision)
1141
1150
 
1142
1151
    def _check_for_inconsistent_revision_parents(self):
1143
1152
        inconsistencies = list(self._find_inconsistent_revision_parents())
1166
1175
        """
1167
1176
        self._chk_id_roots = []
1168
1177
        self._chk_p_id_roots = []
1169
 
 
1170
1178
        def _filtered_inv_stream():
1171
1179
            id_roots_set = set()
1172
1180
            p_id_roots_set = set()
1179
1187
                        continue
1180
1188
                    else:
1181
1189
                        raise errors.NoSuchRevision(self, record.key)
1182
 
                lines = record.get_bytes_as('lines')
1183
 
                chk_inv = inventory.CHKInventory.deserialise(None, lines,
 
1190
                bytes = record.get_bytes_as('fulltext')
 
1191
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1184
1192
                                                             record.key)
1185
1193
                key = chk_inv.id_to_entry.key()
1186
1194
                if key not in id_roots_set:
1212
1220
            # TODO: Update Repository.iter_inventories() to add
1213
1221
            #       ignore_missing=True
1214
1222
            present_keys = self.from_repository._find_present_inventory_keys(
1215
 
                excluded_revision_keys)
 
1223
                            excluded_revision_keys)
1216
1224
            present_ids = [k[-1] for k in present_keys]
1217
1225
            uninteresting_root_keys = set()
1218
1226
            uninteresting_pid_root_keys = set()
1221
1229
                uninteresting_pid_root_keys.add(
1222
1230
                    inv.parent_id_basename_to_file_id.key())
1223
1231
        chk_bytes = self.from_repository.chk_bytes
1224
 
 
1225
1232
        def _filter_id_to_entry():
1226
1233
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1227
 
                                                               self._chk_id_roots, uninteresting_root_keys)
 
1234
                        self._chk_id_roots, uninteresting_root_keys)
1228
1235
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
1229
 
                                            chk_map._bytes_to_text_key):
 
1236
                    chk_map._bytes_to_text_key):
1230
1237
                if record is not None:
1231
1238
                    yield record
1232
1239
            # Consumed
1233
1240
            self._chk_id_roots = None
1234
1241
        yield 'chk_bytes', _filter_id_to_entry()
1235
 
 
1236
1242
        def _get_parent_id_basename_to_file_id_pages():
1237
1243
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1238
 
                                                                self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1244
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
1239
1245
                if record is not None:
1240
1246
                    yield record
1241
1247
            # Consumed
1246
1252
        # Note: We know we don't have to handle adding root keys, because both
1247
1253
        # the source and target are the identical network name.
1248
1254
        text_stream = self.from_repository.texts.get_record_stream(
1249
 
            self._text_keys, self._text_fetch_order, False)
 
1255
                        self._text_keys, self._text_fetch_order, False)
1250
1256
        return ('texts', text_stream)
1251
1257
 
1252
1258
    def get_stream(self, search):
1262
1268
                yield record
1263
1269
 
1264
1270
        revision_ids = search.get_keys()
1265
 
        with ui.ui_factory.nested_progress_bar() as pb:
1266
 
            rc = self._record_counter
1267
 
            self._record_counter.setup(len(revision_ids))
1268
 
            for stream_info in self._fetch_revision_texts(revision_ids):
1269
 
                yield (stream_info[0],
1270
 
                       wrap_and_count(pb, rc, stream_info[1]))
1271
 
            self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1272
 
            # TODO: The keys to exclude might be part of the search recipe
1273
 
            # For now, exclude all parents that are at the edge of ancestry, for
1274
 
            # which we have inventories
1275
 
            from_repo = self.from_repository
1276
 
            parent_keys = from_repo._find_parent_keys_of_revisions(
1277
 
                self._revision_keys)
1278
 
            self.from_repository.revisions.clear_cache()
1279
 
            self.from_repository.signatures.clear_cache()
1280
 
            # Clear the repo's get_parent_map cache too.
1281
 
            self.from_repository._unstacked_provider.disable_cache()
1282
 
            self.from_repository._unstacked_provider.enable_cache()
1283
 
            s = self._get_inventory_stream(self._revision_keys)
1284
 
            yield (s[0], wrap_and_count(pb, rc, s[1]))
1285
 
            self.from_repository.inventories.clear_cache()
1286
 
            for stream_info in self._get_filtered_chk_streams(parent_keys):
1287
 
                yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1288
 
            self.from_repository.chk_bytes.clear_cache()
1289
 
            s = self._get_text_stream()
1290
 
            yield (s[0], wrap_and_count(pb, rc, s[1]))
1291
 
            self.from_repository.texts.clear_cache()
1292
 
            pb.update('Done', rc.max, rc.max)
 
1271
        pb = ui.ui_factory.nested_progress_bar()
 
1272
        rc = self._record_counter
 
1273
        self._record_counter.setup(len(revision_ids))
 
1274
        for stream_info in self._fetch_revision_texts(revision_ids):
 
1275
            yield (stream_info[0],
 
1276
                wrap_and_count(pb, rc, stream_info[1]))
 
1277
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1278
        # TODO: The keys to exclude might be part of the search recipe
 
1279
        # For now, exclude all parents that are at the edge of ancestry, for
 
1280
        # which we have inventories
 
1281
        from_repo = self.from_repository
 
1282
        parent_keys = from_repo._find_parent_keys_of_revisions(
 
1283
                        self._revision_keys)
 
1284
        self.from_repository.revisions.clear_cache()
 
1285
        self.from_repository.signatures.clear_cache()
 
1286
        # Clear the repo's get_parent_map cache too.
 
1287
        self.from_repository._unstacked_provider.disable_cache()
 
1288
        self.from_repository._unstacked_provider.enable_cache()
 
1289
        s = self._get_inventory_stream(self._revision_keys)
 
1290
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1291
        self.from_repository.inventories.clear_cache()
 
1292
        for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1293
            yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1294
        self.from_repository.chk_bytes.clear_cache()
 
1295
        s = self._get_text_stream()
 
1296
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1297
        self.from_repository.texts.clear_cache()
 
1298
        pb.update('Done', rc.max, rc.max)
 
1299
        pb.finished()
1293
1300
 
1294
1301
    def get_stream_for_missing_keys(self, missing_keys):
1295
1302
        # missing keys can only occur when we are byte copying and not
1299
1306
        for key in missing_keys:
1300
1307
            if key[0] != 'inventories':
1301
1308
                raise AssertionError('The only missing keys we should'
1302
 
                                     ' be filling in are inventory keys, not %s'
1303
 
                                     % (key[0],))
 
1309
                    ' be filling in are inventory keys, not %s'
 
1310
                    % (key[0],))
1304
1311
            missing_inventory_keys.add(key[1:])
1305
1312
        if self._chk_id_roots or self._chk_p_id_roots:
1306
1313
            raise AssertionError('Cannot call get_stream_for_missing_keys'
1307
 
                                 ' until all of get_stream() has been consumed.')
 
1314
                ' until all of get_stream() has been consumed.')
1308
1315
        # Yield the inventory stream, so we can find the chk stream
1309
1316
        # Some of the missing_keys will be missing because they are ghosts.
1310
1317
        # As such, we can ignore them. The Sink is required to verify there are
1355
1362
    """
1356
1363
    text_keys_update = text_keys.update
1357
1364
    for record, items in interesting_nodes_iterable:
1358
 
        text_keys_update([bytes_to_text_key(b) for n, b in items])
 
1365
        text_keys_update([bytes_to_text_key(b) for n,b in items])
1359
1366
        yield record
1360
1367
 
1361
1368
 
1365
1372
    repository_class = CHKInventoryRepository
1366
1373
    supports_external_lookups = True
1367
1374
    supports_chks = True
1368
 
    _commit_builder_class = PackCommitBuilder
 
1375
    _commit_builder_class = PackRootCommitBuilder
1369
1376
    rich_root_data = True
1370
1377
    _serializer = chk_serializer.chk_bencode_serializer
1371
1378
    _commit_inv_deltas = True
1379
1386
    # multiple in-a-row (and sharing strings). Topological is better
1380
1387
    # for remote, because we access less data.
1381
1388
    _fetch_order = 'unordered'
1382
 
    # essentially ignored by the groupcompress code.
1383
 
    _fetch_uses_deltas = False
 
1389
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1384
1390
    fast_deltas = True
1385
1391
    pack_compresses = True
1386
 
    supports_tree_reference = True
1387
1392
 
1388
1393
    def _get_matching_bzrdir(self):
1389
 
        return controldir.format_registry.make_controldir('2a')
 
1394
        return controldir.format_registry.make_bzrdir('2a')
1390
1395
 
1391
1396
    def _ignore_setting_bzrdir(self, format):
1392
1397
        pass
1393
1398
 
1394
 
    _matchingcontroldir = property(
1395
 
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1399
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1396
1400
 
1397
1401
    @classmethod
1398
1402
    def get_format_string(cls):
1399
 
        return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
 
1403
        return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1400
1404
 
1401
1405
    def get_format_description(self):
1402
1406
        """See RepositoryFormat.get_format_description()."""
1403
1407
        return ("Repository format 2a - rich roots, group compression"
1404
 
                " and chk inventories")
 
1408
            " and chk inventories")
1405
1409
 
1406
1410
 
1407
1411
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1410
1414
    """
1411
1415
 
1412
1416
    def _get_matching_bzrdir(self):
1413
 
        return controldir.format_registry.make_controldir('development-subtree')
 
1417
        return controldir.format_registry.make_bzrdir('development-subtree')
1414
1418
 
1415
1419
    def _ignore_setting_bzrdir(self, format):
1416
1420
        pass
1417
1421
 
1418
 
    _matchingcontroldir = property(
1419
 
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1422
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1420
1423
 
1421
1424
    @classmethod
1422
1425
    def get_format_string(cls):
1423
 
        return b'Bazaar development format 8\n'
 
1426
        return ('Bazaar development format 8\n')
1424
1427
 
1425
1428
    def get_format_description(self):
1426
1429
        """See RepositoryFormat.get_format_description()."""