/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/repofmt/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Repository formats using CHK inventories and groupcompress compression."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
 
21
 
from .. import (
 
23
from brzlib import (
22
24
    controldir,
 
25
    chk_map,
 
26
    chk_serializer,
23
27
    debug,
24
28
    errors,
 
29
    index as _mod_index,
 
30
    inventory,
25
31
    osutils,
 
32
    pack,
26
33
    revision as _mod_revision,
27
34
    trace,
28
35
    ui,
29
 
    )
30
 
from ..bzr import (
31
 
    chk_map,
32
 
    chk_serializer,
33
 
    index as _mod_index,
34
 
    inventory,
35
 
    pack,
36
36
    versionedfile,
37
37
    )
38
 
from ..bzr.btree_index import (
 
38
from brzlib.btree_index import (
39
39
    BTreeGraphIndex,
40
40
    BTreeBuilder,
41
41
    )
42
 
from ..bzr.groupcompress import (
 
42
from brzlib.decorators import needs_write_lock
 
43
from brzlib.groupcompress import (
43
44
    _GCGraphIndex,
44
45
    GroupCompressVersionedFiles,
45
46
    )
46
 
from .pack_repo import (
 
47
from brzlib.repofmt.pack_repo import (
47
48
    _DirectPackAccess,
48
49
    Pack,
49
50
    NewPack,
50
51
    PackRepository,
51
 
    PackCommitBuilder,
 
52
    PackRootCommitBuilder,
52
53
    RepositoryPackCollection,
53
54
    RepositoryFormatPack,
54
55
    ResumedPack,
55
56
    Packer,
56
57
    )
57
 
from ..bzr.vf_repository import (
 
58
from brzlib.vf_repository import (
58
59
    StreamSource,
59
60
    )
60
 
from ..static_tuple import StaticTuple
 
61
from brzlib.static_tuple import StaticTuple
61
62
 
62
63
 
63
64
class GCPack(NewPack):
85
86
        else:
86
87
            chk_index = None
87
88
        Pack.__init__(self,
88
 
                      # Revisions: parents list, no text compression.
89
 
                      index_builder_class(reference_lists=1),
90
 
                      # Inventory: We want to map compression only, but currently the
91
 
                      # knit code hasn't been updated enough to understand that, so we
92
 
                      # have a regular 2-list index giving parents and compression
93
 
                      # source.
94
 
                      index_builder_class(reference_lists=1),
95
 
                      # Texts: per file graph, for all fileids - so one reference list
96
 
                      # and two elements in the key tuple.
97
 
                      index_builder_class(reference_lists=1, key_elements=2),
98
 
                      # Signatures: Just blobs to store, no compression, no parents
99
 
                      # listing.
100
 
                      index_builder_class(reference_lists=0),
101
 
                      # CHK based storage - just blobs, no compression or parents.
102
 
                      chk_index=chk_index
103
 
                      )
 
89
            # Revisions: parents list, no text compression.
 
90
            index_builder_class(reference_lists=1),
 
91
            # Inventory: We want to map compression only, but currently the
 
92
            # knit code hasn't been updated enough to understand that, so we
 
93
            # have a regular 2-list index giving parents and compression
 
94
            # source.
 
95
            index_builder_class(reference_lists=1),
 
96
            # Texts: per file graph, for all fileids - so one reference list
 
97
            # and two elements in the key tuple.
 
98
            index_builder_class(reference_lists=1, key_elements=2),
 
99
            # Signatures: Just blobs to store, no compression, no parents
 
100
            # listing.
 
101
            index_builder_class(reference_lists=0),
 
102
            # CHK based storage - just blobs, no compression or parents.
 
103
            chk_index=chk_index
 
104
            )
104
105
        self._pack_collection = pack_collection
105
106
        # When we make readonly indices, we need this.
106
107
        self.index_class = pack_collection._index_class
131
132
            self.random_name, mode=self._file_mode)
132
133
        if 'pack' in debug.debug_flags:
133
134
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
134
 
                         time.ctime(), self.upload_transport.base, self.random_name,
135
 
                         time.time() - self.start_time)
 
135
                time.ctime(), self.upload_transport.base, self.random_name,
 
136
                time.time() - self.start_time)
136
137
        # A list of byte sequences to be written to the new pack, and the
137
138
        # aggregate size of them.  Stored as a list rather than separate
138
139
        # variables so that the _write_data closure below can update them.
142
143
        # robertc says- this is a closure rather than a method on the object
143
144
        # so that the variables are locals, and faster than accessing object
144
145
        # members.
145
 
 
146
 
        def _write_data(data, flush=False, _buffer=self._buffer,
147
 
                        _write=self.write_stream.write, _update=self._hash.update):
148
 
            _buffer[0].append(data)
149
 
            _buffer[1] += len(data)
 
146
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
147
            _write=self.write_stream.write, _update=self._hash.update):
 
148
            _buffer[0].append(bytes)
 
149
            _buffer[1] += len(bytes)
150
150
            # buffer cap
151
151
            if _buffer[1] > self._cache_limit or flush:
152
 
                data = b''.join(_buffer[0])
153
 
                _write(data)
154
 
                _update(data)
 
152
                bytes = ''.join(_buffer[0])
 
153
                _write(bytes)
 
154
                _update(bytes)
155
155
                _buffer[:] = [[], 0]
156
156
        # expose this on self, for the occasion when clients want to add data.
157
157
        self._write_data = _write_data
199
199
        self._pack_collection = pack_collection
200
200
        # ATM, We only support this for GCCHK repositories
201
201
        if pack_collection.chk_index is None:
202
 
            raise AssertionError(
203
 
                'pack_collection.chk_index should not be None')
 
202
            raise AssertionError('pack_collection.chk_index should not be None')
204
203
        self._gather_text_refs = False
205
204
        self._chk_id_roots = []
206
205
        self._chk_p_id_roots = []
210
209
 
211
210
    def _get_progress_stream(self, source_vf, keys, message, pb):
212
211
        def pb_stream():
213
 
            substream = source_vf.get_record_stream(
214
 
                keys, 'groupcompress', True)
 
212
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
215
213
            for idx, record in enumerate(substream):
216
214
                if pb is not None:
217
215
                    pb.update(message, idx + 1, len(keys))
221
219
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
222
220
        """Filter the texts of inventories, to find the chk pages."""
223
221
        total_keys = len(keys)
224
 
 
225
222
        def _filtered_inv_stream():
226
223
            id_roots_set = set()
227
224
            p_id_roots_set = set()
228
225
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
229
226
            for idx, record in enumerate(stream):
230
227
                # Inventories should always be with revisions; assume success.
231
 
                lines = record.get_bytes_as('lines')
232
 
                chk_inv = inventory.CHKInventory.deserialise(
233
 
                    None, lines, record.key)
 
228
                bytes = record.get_bytes_as('fulltext')
 
229
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
230
                                                             record.key)
234
231
                if pb is not None:
235
232
                    pb.update('inv', idx, total_keys)
236
233
                key = chk_inv.id_to_entry.key()
272
269
        counter = [0]
273
270
        if self._gather_text_refs:
274
271
            self._text_refs = set()
275
 
 
276
272
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
277
273
            cur_keys = root_keys
278
274
            while cur_keys:
279
275
                keys_by_search_prefix = {}
280
276
                remaining_keys.difference_update(cur_keys)
281
277
                next_keys = set()
282
 
 
283
278
                def handle_internal_node(node):
284
 
                    for prefix, value in node._items.items():
 
279
                    for prefix, value in node._items.iteritems():
285
280
                        # We don't want to request the same key twice, and we
286
281
                        # want to order it by the first time it is seen.
287
282
                        # Even further, we don't want to request a key which is
293
288
                        #       always fill them in for stacked branches
294
289
                        if value not in next_keys and value in remaining_keys:
295
290
                            keys_by_search_prefix.setdefault(prefix,
296
 
                                                             []).append(value)
 
291
                                []).append(value)
297
292
                            next_keys.add(value)
298
 
 
299
293
                def handle_leaf_node(node):
300
294
                    # Store is None, because we know we have a LeafNode, and we
301
295
                    # just want its entries
302
296
                    for file_id, bytes in node.iteritems(None):
303
297
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
304
 
 
305
298
                def next_stream():
306
299
                    stream = source_vf.get_record_stream(cur_keys,
307
300
                                                         'as-requested', True)
404
397
                     pb_offset):
405
398
        trace.mutter('repacking %d %s', len(keys), message)
406
399
        self.pb.update('repacking %s' % (message,), pb_offset)
407
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
400
        child_pb = ui.ui_factory.nested_progress_bar()
 
401
        try:
408
402
            stream = vf_to_stream(source_vf, keys, message, child_pb)
409
 
            for _, _ in target_vf._insert_record_stream(
410
 
                    stream, random_id=True, reuse_blocks=False):
 
403
            for _ in target_vf._insert_record_stream(stream,
 
404
                                                     random_id=True,
 
405
                                                     reuse_blocks=False):
411
406
                pass
 
407
        finally:
 
408
            child_pb.finished()
412
409
 
413
410
    def _copy_revision_texts(self):
414
411
        source_vf, target_vf = self._build_vfs('revision', True, False)
426
423
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
427
424
        # than this.
428
425
        inventory_keys = source_vf.keys()
429
 
        missing_inventories = set(
430
 
            self.revision_keys).difference(inventory_keys)
 
426
        missing_inventories = set(self.revision_keys).difference(inventory_keys)
431
427
        if missing_inventories:
432
428
            # Go back to the original repo, to see if these are really missing
433
429
            # https://bugs.launchpad.net/bzr/+bug/437003
440
436
            if really_missing:
441
437
                missing_inventories = sorted(really_missing)
442
438
                raise ValueError('We are missing inventories for revisions: %s'
443
 
                                 % (missing_inventories,))
 
439
                    % (missing_inventories,))
444
440
        self._copy_stream(source_vf, target_vf, inventory_keys,
445
441
                          'inventories', self._get_filtered_inv_stream, 2)
446
442
 
457
453
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
458
454
                     len(total_keys))
459
455
        self.pb.update('repacking chk', 3)
460
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
456
        child_pb = ui.ui_factory.nested_progress_bar()
 
457
        try:
461
458
            for stream in self._get_chk_streams(source_vf, total_keys,
462
459
                                                pb=child_pb):
463
 
                for _, _ in target_vf._insert_record_stream(
464
 
                        stream, random_id=True, reuse_blocks=False):
 
460
                for _ in target_vf._insert_record_stream(stream,
 
461
                                                         random_id=True,
 
462
                                                         reuse_blocks=False):
465
463
                    pass
 
464
        finally:
 
465
            child_pb.finished()
466
466
 
467
467
    def _copy_text_texts(self):
468
468
        source_vf, target_vf = self._build_vfs('text', True, True)
486
486
        self.pb.update('repacking', 0, 7)
487
487
        self.new_pack = self.open_pack()
488
488
        # Is this necessary for GC ?
489
 
        self.new_pack.set_write_cache_size(1024 * 1024)
 
489
        self.new_pack.set_write_cache_size(1024*1024)
490
490
        self._copy_revision_texts()
491
491
        self._copy_inventory_texts()
492
492
        self._copy_chk_texts()
502
502
            if old_pack.name == self.new_pack._hash.hexdigest():
503
503
                # The single old pack was already optimally packed.
504
504
                trace.mutter('single pack %s was already optimally packed',
505
 
                             old_pack.name)
 
505
                    old_pack.name)
506
506
                self.new_pack.abort()
507
507
                return None
508
508
        self.pb.update('finishing repack', 6, 7)
514
514
class GCCHKReconcilePacker(GCCHKPacker):
515
515
    """A packer which regenerates indices etc as it copies.
516
516
 
517
 
    This is used by ``brz reconcile`` to cause parent text pointers to be
 
517
    This is used by ``bzr reconcile`` to cause parent text pointers to be
518
518
    regenerated.
519
519
    """
520
520
 
543
543
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
544
544
        # Strip keys back into revision_ids.
545
545
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
546
 
                         for k, parents in ancestor_keys.items())
 
546
                         for k, parents in ancestor_keys.iteritems())
547
547
        del ancestor_keys
548
548
        # TODO: _generate_text_key_index should be much cheaper to generate from
549
549
        #       a chk repository, rather than the current implementation
552
552
        # 2) generate a keys list that contains all the entries that can
553
553
        #    be used as-is, with corrected parents.
554
554
        ok_keys = []
555
 
        new_parent_keys = {}  # (key, parent_keys)
 
555
        new_parent_keys = {} # (key, parent_keys)
556
556
        discarded_keys = []
557
557
        NULL_REVISION = _mod_revision.NULL_REVISION
558
558
        for key in self._text_refs:
582
582
        del ideal_index
583
583
        del file_id_parent_map
584
584
        # 3) bulk copy the data, updating records than need it
585
 
 
586
585
        def _update_parents_for_texts():
587
586
            stream = source_vf.get_record_stream(self._text_refs,
588
 
                                                 'groupcompress', False)
 
587
                'groupcompress', False)
589
588
            for record in stream:
590
589
                if record.key in new_parent_keys:
591
590
                    record.parents = new_parent_keys[record.key]
599
598
 
600
599
class GCCHKCanonicalizingPacker(GCCHKPacker):
601
600
    """A packer that ensures inventories have canonical-form CHK maps.
602
 
 
 
601
    
603
602
    Ideally this would be part of reconcile, but it's very slow and rarely
604
603
    needed.  (It repairs repositories affected by
605
604
    https://bugs.launchpad.net/bzr/+bug/522637).
615
614
        This is useful to get the side-effects of generating a stream.
616
615
        """
617
616
        self.pb.update('scanning %s' % (message,), pb_offset)
618
 
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
617
        child_pb = ui.ui_factory.nested_progress_bar()
 
618
        try:
619
619
            list(vf_to_stream(source_vf, keys, message, child_pb))
 
620
        finally:
 
621
            child_pb.finished()
620
622
 
621
623
    def _copy_inventory_texts(self):
622
624
        source_vf, target_vf = self._build_vfs('inventory', True, True)
625
627
        # First, copy the existing CHKs on the assumption that most of them
626
628
        # will be correct.  This will save us from having to reinsert (and
627
629
        # recompress) these records later at the cost of perhaps preserving a
628
 
        # few unused CHKs.
 
630
        # few unused CHKs. 
629
631
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
630
632
        # variables needed by GCCHKPacker._copy_chk_texts.)
631
633
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
632
 
                             self._get_filtered_inv_stream, 2)
 
634
                self._get_filtered_inv_stream, 2)
633
635
        GCCHKPacker._copy_chk_texts(self)
634
636
        # Now copy and fix the inventories, and any regenerated CHKs.
635
 
 
636
637
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
637
638
            return self._get_filtered_canonicalizing_inv_stream(
638
639
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
644
645
        pass
645
646
 
646
647
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
647
 
                                                pb=None, source_chk_vf=None, target_chk_vf=None):
 
648
            pb=None, source_chk_vf=None, target_chk_vf=None):
648
649
        """Filter the texts of inventories, regenerating CHKs to make sure they
649
650
        are canonical.
650
651
        """
651
652
        total_keys = len(keys)
652
653
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
653
 
 
654
654
        def _filtered_inv_stream():
655
655
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
656
656
            search_key_name = None
657
657
            for idx, record in enumerate(stream):
658
658
                # Inventories should always be with revisions; assume success.
659
 
                lines = record.get_bytes_as('lines')
 
659
                bytes = record.get_bytes_as('fulltext')
660
660
                chk_inv = inventory.CHKInventory.deserialise(
661
 
                    source_chk_vf, lines, record.key)
 
661
                    source_chk_vf, bytes, record.key)
662
662
                if pb is not None:
663
663
                    pb.update('inv', idx, total_keys)
664
664
                chk_inv.id_to_entry._ensure_root()
665
665
                if search_key_name is None:
666
666
                    # Find the name corresponding to the search_key_func
667
667
                    search_key_reg = chk_map.search_key_registry
668
 
                    for search_key_name, func in search_key_reg.items():
 
668
                    for search_key_name, func in search_key_reg.iteritems():
669
669
                        if func == chk_inv.id_to_entry._search_key_func:
670
670
                            break
671
671
                canonical_inv = inventory.CHKInventory.from_inventory(
676
676
                    trace.mutter(
677
677
                        'Non-canonical CHK map for id_to_entry of inv: %s '
678
678
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
679
 
                                                        chk_inv.id_to_entry.key()[
680
 
                                                            0],
681
 
                                                        canonical_inv.id_to_entry.key()[0]))
 
679
                        chk_inv.id_to_entry.key()[0],
 
680
                        canonical_inv.id_to_entry.key()[0]))
682
681
                    self._data_changed = True
683
682
                p_id_map = chk_inv.parent_id_basename_to_file_id
684
683
                p_id_map._ensure_root()
690
689
                        % (chk_inv.revision_id, p_id_map.key()[0],
691
690
                           canon_p_id_map.key()[0]))
692
691
                    self._data_changed = True
693
 
                yield versionedfile.ChunkedContentFactory(
694
 
                    record.key, record.parents, record.sha1, canonical_inv.to_lines(),
695
 
                    chunks_are_lines=True)
 
692
                yield versionedfile.ChunkedContentFactory(record.key,
 
693
                        record.parents, record.sha1,
 
694
                        canonical_inv.to_lines())
696
695
            # We have finished processing all of the inventory records, we
697
696
            # don't need these sets anymore
698
697
        return _filtered_inv_stream()
736
735
        missing_corresponding.difference_update(corresponding_invs)
737
736
        if missing_corresponding:
738
737
            problems.append("inventories missing for revisions %s" %
739
 
                            (sorted(missing_corresponding),))
 
738
                (sorted(missing_corresponding),))
740
739
            return problems
741
740
        # Are any chk root entries missing for any inventories?  This includes
742
741
        # any present parent inventories, which may be used when calculating
743
742
        # deltas for streaming.
744
743
        all_inv_keys = set(corresponding_invs)
745
 
        for parent_inv_keys in inv_parent_map.values():
 
744
        for parent_inv_keys in inv_parent_map.itervalues():
746
745
            all_inv_keys.update(parent_inv_keys)
747
746
        # Filter out ghost parents.
748
747
        all_inv_keys.intersection_update(
761
760
        if missing_chk_roots:
762
761
            problems.append(
763
762
                "missing referenced chk root keys: %s."
764
 
                "Run 'brz reconcile --canonicalize-chks' on the affected "
 
763
                "Run 'bzr reconcile --canonicalize-chks' on the affected "
765
764
                "repository."
766
765
                % (sorted(missing_chk_roots),))
767
766
            # Don't bother checking any further.
779
778
            for record in _filter_text_keys(chk_diff, text_keys,
780
779
                                            chk_map._bytes_to_text_key):
781
780
                pass
782
 
        except errors.NoSuchRevision as e:
 
781
        except errors.NoSuchRevision, e:
783
782
            # XXX: It would be nice if we could give a more precise error here.
784
783
            problems.append("missing chk node(s) for id_to_entry maps")
785
784
        chk_diff = chk_map.iter_interesting_nodes(
788
787
        try:
789
788
            for interesting_rec, interesting_map in chk_diff:
790
789
                pass
791
 
        except errors.NoSuchRevision as e:
 
790
        except errors.NoSuchRevision, e:
792
791
            problems.append(
793
792
                "missing chk node(s) for parent_id_basename_to_file_id maps")
794
793
        present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
795
794
        missing_text_keys = text_keys.difference(present_text_keys)
796
795
        if missing_text_keys:
797
796
            problems.append("missing text keys: %r"
798
 
                            % (sorted(missing_text_keys),))
 
797
                % (sorted(missing_text_keys),))
799
798
        return problems
800
799
 
801
800
 
802
801
class CHKInventoryRepository(PackRepository):
803
802
    """subclass of PackRepository that uses CHK based inventories."""
804
803
 
805
 
    def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
806
 
                 _serializer):
 
804
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
805
        _serializer):
807
806
        """Overridden to change pack collection class."""
808
 
        super(CHKInventoryRepository, self).__init__(_format, a_controldir,
809
 
                                                     control_files, _commit_builder_class, _serializer)
 
807
        super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
 
808
            control_files, _commit_builder_class, _serializer)
810
809
        index_transport = self._transport.clone('indices')
811
810
        self._pack_collection = GCRepositoryPackCollection(self,
812
 
                                                           self._transport, index_transport,
813
 
                                                           self._transport.clone(
814
 
                                                               'upload'),
815
 
                                                           self._transport.clone(
816
 
                                                               'packs'),
817
 
                                                           _format.index_builder_class,
818
 
                                                           _format.index_class,
819
 
                                                           use_chk_index=self._format.supports_chks,
820
 
                                                           )
 
811
            self._transport, index_transport,
 
812
            self._transport.clone('upload'),
 
813
            self._transport.clone('packs'),
 
814
            _format.index_builder_class,
 
815
            _format.index_class,
 
816
            use_chk_index=self._format.supports_chks,
 
817
            )
821
818
        self.inventories = GroupCompressVersionedFiles(
822
819
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
823
 
                          add_callback=self._pack_collection.inventory_index.add_callback,
824
 
                          parents=True, is_locked=self.is_locked,
825
 
                          inconsistency_fatal=False),
 
820
                add_callback=self._pack_collection.inventory_index.add_callback,
 
821
                parents=True, is_locked=self.is_locked,
 
822
                inconsistency_fatal=False),
826
823
            access=self._pack_collection.inventory_index.data_access)
827
824
        self.revisions = GroupCompressVersionedFiles(
828
825
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
829
 
                          add_callback=self._pack_collection.revision_index.add_callback,
830
 
                          parents=True, is_locked=self.is_locked,
831
 
                          track_external_parent_refs=True, track_new_keys=True),
 
826
                add_callback=self._pack_collection.revision_index.add_callback,
 
827
                parents=True, is_locked=self.is_locked,
 
828
                track_external_parent_refs=True, track_new_keys=True),
832
829
            access=self._pack_collection.revision_index.data_access,
833
830
            delta=False)
834
831
        self.signatures = GroupCompressVersionedFiles(
835
832
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
836
 
                          add_callback=self._pack_collection.signature_index.add_callback,
837
 
                          parents=False, is_locked=self.is_locked,
838
 
                          inconsistency_fatal=False),
 
833
                add_callback=self._pack_collection.signature_index.add_callback,
 
834
                parents=False, is_locked=self.is_locked,
 
835
                inconsistency_fatal=False),
839
836
            access=self._pack_collection.signature_index.data_access,
840
837
            delta=False)
841
838
        self.texts = GroupCompressVersionedFiles(
842
839
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
843
 
                          add_callback=self._pack_collection.text_index.add_callback,
844
 
                          parents=True, is_locked=self.is_locked,
845
 
                          inconsistency_fatal=False),
 
840
                add_callback=self._pack_collection.text_index.add_callback,
 
841
                parents=True, is_locked=self.is_locked,
 
842
                inconsistency_fatal=False),
846
843
            access=self._pack_collection.text_index.data_access)
847
844
        # No parents, individual CHK pages don't have specific ancestry
848
845
        self.chk_bytes = GroupCompressVersionedFiles(
849
846
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
850
 
                          add_callback=self._pack_collection.chk_index.add_callback,
851
 
                          parents=False, is_locked=self.is_locked,
852
 
                          inconsistency_fatal=False),
 
847
                add_callback=self._pack_collection.chk_index.add_callback,
 
848
                parents=False, is_locked=self.is_locked,
 
849
                inconsistency_fatal=False),
853
850
            access=self._pack_collection.chk_index.data_access)
854
851
        search_key_name = self._format._serializer.search_key_name
855
852
        search_key_func = chk_map.search_key_registry.get(search_key_name)
876
873
        # make inventory
877
874
        serializer = self._format._serializer
878
875
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
879
 
                                                       maximum_size=serializer.maximum_size,
880
 
                                                       search_key_name=serializer.search_key_name)
 
876
            maximum_size=serializer.maximum_size,
 
877
            search_key_name=serializer.search_key_name)
881
878
        inv_lines = result.to_lines()
882
879
        return self._inventory_add_lines(revision_id, parents,
883
 
                                         inv_lines, check_content=False)
 
880
            inv_lines, check_content=False)
884
881
 
885
882
    def _create_inv_from_null(self, delta, revision_id):
886
883
        """This will mutate new_inv directly.
904
901
                                 ' no new_path %r' % (file_id,))
905
902
            if new_path == '':
906
903
                new_inv.root_id = file_id
907
 
                parent_id_basename_key = StaticTuple(b'', b'').intern()
 
904
                parent_id_basename_key = StaticTuple('', '').intern()
908
905
            else:
909
906
                utf8_entry_name = entry.name.encode('utf-8')
910
907
                parent_id_basename_key = StaticTuple(entry.parent_id,
917
914
            parent_id_basename_dict[parent_id_basename_key] = file_id
918
915
 
919
916
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
920
 
                                     parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
917
            parent_id_basename_dict, maximum_size=serializer.maximum_size)
921
918
        return new_inv
922
919
 
923
920
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
949
946
            raise AssertionError("%r not in write group" % (self,))
950
947
        _mod_revision.check_not_reserved_id(new_revision_id)
951
948
        basis_tree = None
952
 
        if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
 
949
        if basis_inv is None:
953
950
            if basis_revision_id == _mod_revision.NULL_REVISION:
954
951
                new_inv = self._create_inv_from_null(delta, new_revision_id)
955
952
                if new_inv.root_id is None:
956
953
                    raise errors.RootMissing()
957
954
                inv_lines = new_inv.to_lines()
958
955
                return self._inventory_add_lines(new_revision_id, parents,
959
 
                                                 inv_lines, check_content=False), new_inv
 
956
                    inv_lines, check_content=False), new_inv
960
957
            else:
961
958
                basis_tree = self.revision_tree(basis_revision_id)
962
959
                basis_tree.lock_read()
963
960
                basis_inv = basis_tree.root_inventory
964
961
        try:
965
962
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
966
 
                                                     propagate_caches=propagate_caches)
 
963
                propagate_caches=propagate_caches)
967
964
            inv_lines = result.to_lines()
968
965
            return self._inventory_add_lines(new_revision_id, parents,
969
 
                                             inv_lines, check_content=False), result
 
966
                inv_lines, check_content=False), result
970
967
        finally:
971
968
            if basis_tree is not None:
972
969
                basis_tree.unlock()
973
970
 
974
 
    def _deserialise_inventory(self, revision_id, lines):
975
 
        return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
976
 
                                                  (revision_id,))
 
971
    def _deserialise_inventory(self, revision_id, bytes):
 
972
        return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
 
973
            (revision_id,))
977
974
 
978
975
    def _iter_inventories(self, revision_ids, ordering):
979
976
        """Iterate over many inventory objects."""
984
981
        texts = {}
985
982
        for record in stream:
986
983
            if record.storage_kind != 'absent':
987
 
                texts[record.key] = record.get_bytes_as('lines')
 
984
                texts[record.key] = record.get_bytes_as('fulltext')
988
985
            else:
989
986
                texts[record.key] = None
990
987
        for key in keys:
991
 
            lines = texts[key]
992
 
            if lines is None:
 
988
            bytes = texts[key]
 
989
            if bytes is None:
993
990
                yield (None, key[-1])
994
991
            else:
995
992
                yield (inventory.CHKInventory.deserialise(
996
 
                    self.chk_bytes, lines, key), key[-1])
 
993
                    self.chk_bytes, bytes, key), key[-1])
997
994
 
998
995
    def _get_inventory_xml(self, revision_id):
999
996
        """Get serialized inventory as a string."""
1002
999
        # it allowing _get_inventory_xml to work. Bundles currently use the
1003
1000
        # serializer directly; this also isn't ideal, but there isn't an xml
1004
1001
        # iteration interface offered at all for repositories.
1005
 
        return self._serializer.write_inventory_to_lines(
 
1002
        return self._serializer.write_inventory_to_string(
1006
1003
            self.get_inventory(revision_id))
1007
1004
 
1008
1005
    def _find_present_inventory_keys(self, revision_keys):
1023
1020
        rich_root = self.supports_rich_root()
1024
1021
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1025
1022
        file_id_revisions = {}
1026
 
        with ui.ui_factory.nested_progress_bar() as pb:
 
1023
        pb = ui.ui_factory.nested_progress_bar()
 
1024
        try:
1027
1025
            revision_keys = [(r,) for r in revision_ids]
1028
1026
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1029
1027
            # TODO: instead of using _find_present_inventory_keys, change the
1031
1029
            #       However, we only want to tolerate missing parent
1032
1030
            #       inventories, not missing inventories for revision_ids
1033
1031
            present_parent_inv_keys = self._find_present_inventory_keys(
1034
 
                parent_keys)
1035
 
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
 
1032
                                        parent_keys)
 
1033
            present_parent_inv_ids = set(
 
1034
                [k[-1] for k in present_parent_inv_keys])
1036
1035
            inventories_to_read = set(revision_ids)
1037
1036
            inventories_to_read.update(present_parent_inv_ids)
1038
1037
            root_key_info = _build_interesting_key_sets(
1041
1040
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
1042
1041
            chk_bytes = self.chk_bytes
1043
1042
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1044
 
                                                                interesting_root_keys, uninteresting_root_keys,
1045
 
                                                                pb=pb):
 
1043
                        interesting_root_keys, uninteresting_root_keys,
 
1044
                        pb=pb):
1046
1045
                for name, bytes in items:
1047
1046
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1048
1047
                    # TODO: consider interning file_id, revision_id here, or
1054
1053
                    try:
1055
1054
                        file_id_revisions[file_id].add(revision_id)
1056
1055
                    except KeyError:
1057
 
                        file_id_revisions[file_id] = {revision_id}
 
1056
                        file_id_revisions[file_id] = set([revision_id])
 
1057
        finally:
 
1058
            pb.finished()
1058
1059
        return file_id_revisions
1059
1060
 
1060
1061
    def find_text_key_references(self):
1072
1073
        revision_keys = self.revisions.keys()
1073
1074
        result = {}
1074
1075
        rich_roots = self.supports_rich_root()
1075
 
        with ui.ui_factory.nested_progress_bar() as pb:
 
1076
        pb = ui.ui_factory.nested_progress_bar()
 
1077
        try:
1076
1078
            all_revs = self.all_revision_ids()
1077
1079
            total = len(all_revs)
1078
1080
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
1085
1087
                    if entry.revision == inv.revision_id:
1086
1088
                        result[key] = True
1087
1089
            return result
 
1090
        finally:
 
1091
            pb.finished()
1088
1092
 
 
1093
    @needs_write_lock
1089
1094
    def reconcile_canonicalize_chks(self):
1090
1095
        """Reconcile this repository to make sure all CHKs are in canonical
1091
1096
        form.
1092
1097
        """
1093
 
        from .reconcile import PackReconciler
1094
 
        with self.lock_write():
1095
 
            reconciler = PackReconciler(
1096
 
                self, thorough=True, canonicalize_chks=True)
1097
 
            return reconciler.reconcile()
 
1098
        from brzlib.reconcile import PackReconciler
 
1099
        reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
 
1100
        reconciler.reconcile()
 
1101
        return reconciler
1098
1102
 
1099
1103
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
1100
1104
        packer = GCCHKReconcilePacker(collection, packs, extension)
1127
1131
            raise AssertionError()
1128
1132
        vf = self.revisions
1129
1133
        if revisions_iterator is None:
1130
 
            revisions_iterator = self.iter_revisions(self.all_revision_ids())
 
1134
            revisions_iterator = self._iter_revisions(None)
1131
1135
        for revid, revision in revisions_iterator:
1132
1136
            if revision is None:
1133
1137
                pass
1134
1138
            parent_map = vf.get_parent_map([(revid,)])
1135
1139
            parents_according_to_index = tuple(parent[-1] for parent in
1136
 
                                               parent_map[(revid,)])
 
1140
                parent_map[(revid,)])
1137
1141
            parents_according_to_revision = tuple(revision.parent_ids)
1138
1142
            if parents_according_to_index != parents_according_to_revision:
1139
1143
                yield (revid, parents_according_to_index,
1140
 
                       parents_according_to_revision)
 
1144
                    parents_according_to_revision)
1141
1145
 
1142
1146
    def _check_for_inconsistent_revision_parents(self):
1143
1147
        inconsistencies = list(self._find_inconsistent_revision_parents())
1166
1170
        """
1167
1171
        self._chk_id_roots = []
1168
1172
        self._chk_p_id_roots = []
1169
 
 
1170
1173
        def _filtered_inv_stream():
1171
1174
            id_roots_set = set()
1172
1175
            p_id_roots_set = set()
1179
1182
                        continue
1180
1183
                    else:
1181
1184
                        raise errors.NoSuchRevision(self, record.key)
1182
 
                lines = record.get_bytes_as('lines')
1183
 
                chk_inv = inventory.CHKInventory.deserialise(None, lines,
 
1185
                bytes = record.get_bytes_as('fulltext')
 
1186
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1184
1187
                                                             record.key)
1185
1188
                key = chk_inv.id_to_entry.key()
1186
1189
                if key not in id_roots_set:
1212
1215
            # TODO: Update Repository.iter_inventories() to add
1213
1216
            #       ignore_missing=True
1214
1217
            present_keys = self.from_repository._find_present_inventory_keys(
1215
 
                excluded_revision_keys)
 
1218
                            excluded_revision_keys)
1216
1219
            present_ids = [k[-1] for k in present_keys]
1217
1220
            uninteresting_root_keys = set()
1218
1221
            uninteresting_pid_root_keys = set()
1221
1224
                uninteresting_pid_root_keys.add(
1222
1225
                    inv.parent_id_basename_to_file_id.key())
1223
1226
        chk_bytes = self.from_repository.chk_bytes
1224
 
 
1225
1227
        def _filter_id_to_entry():
1226
1228
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1227
 
                                                               self._chk_id_roots, uninteresting_root_keys)
 
1229
                        self._chk_id_roots, uninteresting_root_keys)
1228
1230
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
1229
 
                                            chk_map._bytes_to_text_key):
 
1231
                    chk_map._bytes_to_text_key):
1230
1232
                if record is not None:
1231
1233
                    yield record
1232
1234
            # Consumed
1233
1235
            self._chk_id_roots = None
1234
1236
        yield 'chk_bytes', _filter_id_to_entry()
1235
 
 
1236
1237
        def _get_parent_id_basename_to_file_id_pages():
1237
1238
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1238
 
                                                                self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1239
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
1239
1240
                if record is not None:
1240
1241
                    yield record
1241
1242
            # Consumed
1246
1247
        # Note: We know we don't have to handle adding root keys, because both
1247
1248
        # the source and target are the identical network name.
1248
1249
        text_stream = self.from_repository.texts.get_record_stream(
1249
 
            self._text_keys, self._text_fetch_order, False)
 
1250
                        self._text_keys, self._text_fetch_order, False)
1250
1251
        return ('texts', text_stream)
1251
1252
 
1252
1253
    def get_stream(self, search):
1262
1263
                yield record
1263
1264
 
1264
1265
        revision_ids = search.get_keys()
1265
 
        with ui.ui_factory.nested_progress_bar() as pb:
1266
 
            rc = self._record_counter
1267
 
            self._record_counter.setup(len(revision_ids))
1268
 
            for stream_info in self._fetch_revision_texts(revision_ids):
1269
 
                yield (stream_info[0],
1270
 
                       wrap_and_count(pb, rc, stream_info[1]))
1271
 
            self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1272
 
            # TODO: The keys to exclude might be part of the search recipe
1273
 
            # For now, exclude all parents that are at the edge of ancestry, for
1274
 
            # which we have inventories
1275
 
            from_repo = self.from_repository
1276
 
            parent_keys = from_repo._find_parent_keys_of_revisions(
1277
 
                self._revision_keys)
1278
 
            self.from_repository.revisions.clear_cache()
1279
 
            self.from_repository.signatures.clear_cache()
1280
 
            # Clear the repo's get_parent_map cache too.
1281
 
            self.from_repository._unstacked_provider.disable_cache()
1282
 
            self.from_repository._unstacked_provider.enable_cache()
1283
 
            s = self._get_inventory_stream(self._revision_keys)
1284
 
            yield (s[0], wrap_and_count(pb, rc, s[1]))
1285
 
            self.from_repository.inventories.clear_cache()
1286
 
            for stream_info in self._get_filtered_chk_streams(parent_keys):
1287
 
                yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1288
 
            self.from_repository.chk_bytes.clear_cache()
1289
 
            s = self._get_text_stream()
1290
 
            yield (s[0], wrap_and_count(pb, rc, s[1]))
1291
 
            self.from_repository.texts.clear_cache()
1292
 
            pb.update('Done', rc.max, rc.max)
 
1266
        pb = ui.ui_factory.nested_progress_bar()
 
1267
        rc = self._record_counter
 
1268
        self._record_counter.setup(len(revision_ids))
 
1269
        for stream_info in self._fetch_revision_texts(revision_ids):
 
1270
            yield (stream_info[0],
 
1271
                wrap_and_count(pb, rc, stream_info[1]))
 
1272
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1273
        # TODO: The keys to exclude might be part of the search recipe
 
1274
        # For now, exclude all parents that are at the edge of ancestry, for
 
1275
        # which we have inventories
 
1276
        from_repo = self.from_repository
 
1277
        parent_keys = from_repo._find_parent_keys_of_revisions(
 
1278
                        self._revision_keys)
 
1279
        self.from_repository.revisions.clear_cache()
 
1280
        self.from_repository.signatures.clear_cache()
 
1281
        # Clear the repo's get_parent_map cache too.
 
1282
        self.from_repository._unstacked_provider.disable_cache()
 
1283
        self.from_repository._unstacked_provider.enable_cache()
 
1284
        s = self._get_inventory_stream(self._revision_keys)
 
1285
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1286
        self.from_repository.inventories.clear_cache()
 
1287
        for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1288
            yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1289
        self.from_repository.chk_bytes.clear_cache()
 
1290
        s = self._get_text_stream()
 
1291
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1292
        self.from_repository.texts.clear_cache()
 
1293
        pb.update('Done', rc.max, rc.max)
 
1294
        pb.finished()
1293
1295
 
1294
1296
    def get_stream_for_missing_keys(self, missing_keys):
1295
1297
        # missing keys can only occur when we are byte copying and not
1299
1301
        for key in missing_keys:
1300
1302
            if key[0] != 'inventories':
1301
1303
                raise AssertionError('The only missing keys we should'
1302
 
                                     ' be filling in are inventory keys, not %s'
1303
 
                                     % (key[0],))
 
1304
                    ' be filling in are inventory keys, not %s'
 
1305
                    % (key[0],))
1304
1306
            missing_inventory_keys.add(key[1:])
1305
1307
        if self._chk_id_roots or self._chk_p_id_roots:
1306
1308
            raise AssertionError('Cannot call get_stream_for_missing_keys'
1307
 
                                 ' until all of get_stream() has been consumed.')
 
1309
                ' until all of get_stream() has been consumed.')
1308
1310
        # Yield the inventory stream, so we can find the chk stream
1309
1311
        # Some of the missing_keys will be missing because they are ghosts.
1310
1312
        # As such, we can ignore them. The Sink is required to verify there are
1355
1357
    """
1356
1358
    text_keys_update = text_keys.update
1357
1359
    for record, items in interesting_nodes_iterable:
1358
 
        text_keys_update([bytes_to_text_key(b) for n, b in items])
 
1360
        text_keys_update([bytes_to_text_key(b) for n,b in items])
1359
1361
        yield record
1360
1362
 
1361
1363
 
1365
1367
    repository_class = CHKInventoryRepository
1366
1368
    supports_external_lookups = True
1367
1369
    supports_chks = True
1368
 
    _commit_builder_class = PackCommitBuilder
 
1370
    _commit_builder_class = PackRootCommitBuilder
1369
1371
    rich_root_data = True
1370
1372
    _serializer = chk_serializer.chk_bencode_serializer
1371
1373
    _commit_inv_deltas = True
1379
1381
    # multiple in-a-row (and sharing strings). Topological is better
1380
1382
    # for remote, because we access less data.
1381
1383
    _fetch_order = 'unordered'
1382
 
    # essentially ignored by the groupcompress code.
1383
 
    _fetch_uses_deltas = False
 
1384
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1384
1385
    fast_deltas = True
1385
1386
    pack_compresses = True
1386
 
    supports_tree_reference = True
1387
1387
 
1388
1388
    def _get_matching_bzrdir(self):
1389
 
        return controldir.format_registry.make_controldir('2a')
 
1389
        return controldir.format_registry.make_bzrdir('2a')
1390
1390
 
1391
1391
    def _ignore_setting_bzrdir(self, format):
1392
1392
        pass
1393
1393
 
1394
 
    _matchingcontroldir = property(
1395
 
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1394
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1396
1395
 
1397
1396
    @classmethod
1398
1397
    def get_format_string(cls):
1399
 
        return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
 
1398
        return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1400
1399
 
1401
1400
    def get_format_description(self):
1402
1401
        """See RepositoryFormat.get_format_description()."""
1403
1402
        return ("Repository format 2a - rich roots, group compression"
1404
 
                " and chk inventories")
 
1403
            " and chk inventories")
1405
1404
 
1406
1405
 
1407
1406
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1410
1409
    """
1411
1410
 
1412
1411
    def _get_matching_bzrdir(self):
1413
 
        return controldir.format_registry.make_controldir('development-subtree')
 
1412
        return controldir.format_registry.make_bzrdir('development-subtree')
1414
1413
 
1415
1414
    def _ignore_setting_bzrdir(self, format):
1416
1415
        pass
1417
1416
 
1418
 
    _matchingcontroldir = property(
1419
 
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1417
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1420
1418
 
1421
1419
    @classmethod
1422
1420
    def get_format_string(cls):
1423
 
        return b'Bazaar development format 8\n'
 
1421
        return ('Bazaar development format 8\n')
1424
1422
 
1425
1423
    def get_format_description(self):
1426
1424
        """See RepositoryFormat.get_format_description()."""