/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Robert Collins
  • Date: 2008-02-13 03:30:01 UTC
  • mfrom: (3221 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3224.
  • Revision ID: robertc@robertcollins.net-20080213033001-rw70ul0zb02ph856
Merge to fix conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
 
24
24
from bzrlib import (
25
25
        debug,
 
26
        graph,
26
27
        pack,
27
28
        ui,
28
29
        )
37
38
from bzrlib.osutils import rand_chars
38
39
from bzrlib.pack import ContainerWriter
39
40
from bzrlib.store import revision
 
41
from bzrlib import tsort
40
42
""")
41
43
from bzrlib import (
42
44
    bzrdir,
46
48
    lockable_files,
47
49
    lockdir,
48
50
    osutils,
 
51
    symbol_versioning,
49
52
    transactions,
50
53
    xml5,
 
54
    xml6,
51
55
    xml7,
52
56
    )
53
57
 
72
76
    added text, reducing memory and object pressure.
73
77
    """
74
78
 
 
79
    def __init__(self, repository, parents, config, timestamp=None,
 
80
                 timezone=None, committer=None, revprops=None,
 
81
                 revision_id=None):
 
82
        CommitBuilder.__init__(self, repository, parents, config,
 
83
            timestamp=timestamp, timezone=timezone, committer=committer,
 
84
            revprops=revprops, revision_id=revision_id)
 
85
        self._file_graph = graph.Graph(
 
86
            repository._pack_collection.text_index.combined_index)
 
87
 
75
88
    def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
76
89
        return self.repository._pack_collection._add_text_to_weave(file_id,
77
90
            self._new_revision_id, new_lines, parents, nostore_sha,
78
91
            self.random_revid)
79
92
 
 
93
    def _heads(self, file_id, revision_ids):
 
94
        keys = [(file_id, revision_id) for revision_id in revision_ids]
 
95
        return set([key[1] for key in self._file_graph.heads(keys)])
 
96
 
80
97
 
81
98
class PackRootCommitBuilder(RootCommitBuilder):
82
99
    """A subclass of RootCommitBuilder to add texts with pack semantics.
85
102
    added text, reducing memory and object pressure.
86
103
    """
87
104
 
 
105
    def __init__(self, repository, parents, config, timestamp=None,
 
106
                 timezone=None, committer=None, revprops=None,
 
107
                 revision_id=None):
 
108
        CommitBuilder.__init__(self, repository, parents, config,
 
109
            timestamp=timestamp, timezone=timezone, committer=committer,
 
110
            revprops=revprops, revision_id=revision_id)
 
111
        self._file_graph = graph.Graph(
 
112
            repository._pack_collection.text_index.combined_index)
 
113
 
88
114
    def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
89
115
        return self.repository._pack_collection._add_text_to_weave(file_id,
90
116
            self._new_revision_id, new_lines, parents, nostore_sha,
91
117
            self.random_revid)
92
118
 
 
119
    def _heads(self, file_id, revision_ids):
 
120
        keys = [(file_id, revision_id) for revision_id in revision_ids]
 
121
        return set([key[1] for key in self._file_graph.heads(keys)])
 
122
 
93
123
 
94
124
class Pack(object):
95
125
    """An in memory proxy for a pack and its indices.
145
175
        """The text index is the name + .tix."""
146
176
        return self.index_name('text', name)
147
177
 
 
178
    def _external_compression_parents_of_texts(self):
 
179
        keys = set()
 
180
        refs = set()
 
181
        for node in self.text_index.iter_all_entries():
 
182
            keys.add(node[1])
 
183
            refs.update(node[3][1])
 
184
        return refs - keys
 
185
 
148
186
 
149
187
class ExistingPack(Pack):
150
188
    """An in memory proxy for an existing .pack and its disk indices."""
187
225
        }
188
226
 
189
227
    def __init__(self, upload_transport, index_transport, pack_transport,
190
 
        upload_suffix=''):
 
228
        upload_suffix='', file_mode=None):
191
229
        """Create a NewPack instance.
192
230
 
193
231
        :param upload_transport: A writable transport for the pack to be
199
237
            upload_transport.clone('../packs').
200
238
        :param upload_suffix: An optional suffix to be given to any temporary
201
239
            files created during the pack creation. e.g '.autopack'
 
240
        :param file_mode: An optional file mode to create the new files with.
202
241
        """
203
242
        # The relative locations of the packs are constrained, but all are
204
243
        # passed in because the caller has them, so as to avoid object churn.
223
262
        self.index_transport = index_transport
224
263
        # where is the pack renamed to when it is finished?
225
264
        self.pack_transport = pack_transport
 
265
        # What file mode to upload the pack and indices with.
 
266
        self._file_mode = file_mode
226
267
        # tracks the content written to the .pack file.
227
268
        self._hash = md5.new()
228
269
        # a four-tuple with the length in bytes of the indices, once the pack
239
280
        self.start_time = time.time()
240
281
        # open an output stream for the data added to the pack.
241
282
        self.write_stream = self.upload_transport.open_write_stream(
242
 
            self.random_name)
 
283
            self.random_name, mode=self._file_mode)
243
284
        if 'pack' in debug.debug_flags:
244
285
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
245
286
                time.ctime(), self.upload_transport.base, self.random_name,
342
383
                self.pack_transport, self.name,
343
384
                time.time() - self.start_time)
344
385
 
 
386
    def flush(self):
 
387
        """Flush any current data."""
 
388
        if self._buffer[1]:
 
389
            bytes = ''.join(self._buffer[0])
 
390
            self.write_stream.write(bytes)
 
391
            self._hash.update(bytes)
 
392
            self._buffer[:] = [[], 0]
 
393
 
345
394
    def index_name(self, index_type, name):
346
395
        """Get the disk name of an index type for pack name 'name'."""
347
396
        return name + NewPack.index_definitions[index_type][0]
368
417
        """
369
418
        index_name = self.index_name(index_type, self.name)
370
419
        self.index_sizes[self.index_offset(index_type)] = \
371
 
            self.index_transport.put_file(index_name, index.finish())
 
420
            self.index_transport.put_file(index_name, index.finish(),
 
421
            mode=self._file_mode)
372
422
        if 'pack' in debug.debug_flags:
373
423
            # XXX: size might be interesting?
374
424
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
490
540
        self.packs = packs
491
541
        self.suffix = suffix
492
542
        self.revision_ids = revision_ids
 
543
        # The pack object we are creating.
 
544
        self.new_pack = None
493
545
        self._pack_collection = pack_collection
 
546
        # The index layer keys for the revisions being copied. None for 'all
 
547
        # objects'.
 
548
        self._revision_keys = None
 
549
        # What text keys to copy. None for 'all texts'. This is set by
 
550
        # _copy_inventory_texts
 
551
        self._text_filter = None
 
552
        self._extra_init()
 
553
 
 
554
    def _extra_init(self):
 
555
        """A template hook to allow extending the constructor trivially."""
494
556
 
495
557
    def pack(self, pb=None):
496
558
        """Create a new pack by reading data from other packs.
533
595
        """Open a pack for the pack we are creating."""
534
596
        return NewPack(self._pack_collection._upload_transport,
535
597
            self._pack_collection._index_transport,
536
 
            self._pack_collection._pack_transport, upload_suffix=self.suffix)
 
598
            self._pack_collection._pack_transport, upload_suffix=self.suffix,
 
599
            file_mode=self._pack_collection.repo.control_files._file_mode)
537
600
 
538
 
    def _create_pack_from_packs(self):
539
 
        self.pb.update("Opening pack", 0, 5)
540
 
        new_pack = self.open_pack()
541
 
        # buffer data - we won't be reading-back during the pack creation and
542
 
        # this makes a significant difference on sftp pushes.
543
 
        new_pack.set_write_cache_size(1024*1024)
544
 
        if 'pack' in debug.debug_flags:
545
 
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
546
 
                for a_pack in self.packs]
547
 
            if self.revision_ids is not None:
548
 
                rev_count = len(self.revision_ids)
549
 
            else:
550
 
                rev_count = 'all'
551
 
            mutter('%s: create_pack: creating pack from source packs: '
552
 
                '%s%s %s revisions wanted %s t=0',
553
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
554
 
                plain_pack_list, rev_count)
 
601
    def _copy_revision_texts(self):
 
602
        """Copy revision data to the new pack."""
555
603
        # select revisions
556
604
        if self.revision_ids:
557
605
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
558
606
        else:
559
607
            revision_keys = None
560
 
 
561
608
        # select revision keys
562
609
        revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
563
610
            self.packs, 'revision_index')[0]
564
611
        revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
565
612
        # copy revision keys and adjust values
566
613
        self.pb.update("Copying revision texts", 1)
567
 
        list(self._copy_nodes_graph(revision_nodes, revision_index_map,
568
 
            new_pack._writer, new_pack.revision_index))
 
614
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
 
615
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
 
616
            self.new_pack.revision_index, readv_group_iter, total_items))
569
617
        if 'pack' in debug.debug_flags:
570
618
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
571
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
572
 
                new_pack.revision_index.key_count(),
573
 
                time.time() - new_pack.start_time)
 
619
                time.ctime(), self._pack_collection._upload_transport.base,
 
620
                self.new_pack.random_name,
 
621
                self.new_pack.revision_index.key_count(),
 
622
                time.time() - self.new_pack.start_time)
 
623
        self._revision_keys = revision_keys
 
624
 
 
625
    def _copy_inventory_texts(self):
 
626
        """Copy the inventory texts to the new pack.
 
627
 
 
628
        self._revision_keys is used to determine what inventories to copy.
 
629
 
 
630
        Sets self._text_filter appropriately.
 
631
        """
574
632
        # select inventory keys
575
 
        inv_keys = revision_keys # currently the same keyspace, and note that
 
633
        inv_keys = self._revision_keys # currently the same keyspace, and note that
576
634
        # querying for keys here could introduce a bug where an inventory item
577
635
        # is missed, so do not change it to query separately without cross
578
636
        # checking like the text key check below.
583
641
        # XXX: Should be a helper function to allow different inv representation
584
642
        # at this point.
585
643
        self.pb.update("Copying inventory texts", 2)
586
 
        inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
587
 
            new_pack._writer, new_pack.inventory_index, output_lines=True)
 
644
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
 
645
        inv_lines = self._copy_nodes_graph(inventory_index_map,
 
646
            self.new_pack._writer, self.new_pack.inventory_index,
 
647
            readv_group_iter, total_items, output_lines=True)
588
648
        if self.revision_ids:
589
 
            fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
590
 
                inv_lines, self.revision_ids)
591
 
            text_filter = []
592
 
            for fileid, file_revids in fileid_revisions.iteritems():
593
 
                text_filter.extend(
594
 
                    [(fileid, file_revid) for file_revid in file_revids])
 
649
            self._process_inventory_lines(inv_lines)
595
650
        else:
596
651
            # eat the iterator to cause it to execute.
597
652
            list(inv_lines)
598
 
            text_filter = None
 
653
            self._text_filter = None
599
654
        if 'pack' in debug.debug_flags:
600
655
            mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
601
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
602
 
                new_pack.inventory_index.key_count(),
 
656
                time.ctime(), self._pack_collection._upload_transport.base,
 
657
                self.new_pack.random_name,
 
658
                self.new_pack.inventory_index.key_count(),
603
659
                time.time() - new_pack.start_time)
 
660
 
 
661
    def _copy_text_texts(self):
604
662
        # select text keys
605
 
        text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
606
 
            self.packs, 'text_index')[0]
607
 
        text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
608
 
        if text_filter is not None:
 
663
        text_index_map, text_nodes = self._get_text_nodes()
 
664
        if self._text_filter is not None:
609
665
            # We could return the keys copied as part of the return value from
610
666
            # _copy_nodes_graph but this doesn't work all that well with the
611
667
            # need to get line output too, so we check separately, and as we're
614
670
            # mising records.
615
671
            text_nodes = set(text_nodes)
616
672
            present_text_keys = set(_node[1] for _node in text_nodes)
617
 
            missing_text_keys = set(text_filter) - present_text_keys
 
673
            missing_text_keys = set(self._text_filter) - present_text_keys
618
674
            if missing_text_keys:
619
675
                # TODO: raise a specific error that can handle many missing
620
676
                # keys.
623
679
                    a_missing_key[0])
624
680
        # copy text keys and adjust values
625
681
        self.pb.update("Copying content texts", 3)
626
 
        list(self._copy_nodes_graph(text_nodes, text_index_map,
627
 
            new_pack._writer, new_pack.text_index))
 
682
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
 
683
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
684
            self.new_pack.text_index, readv_group_iter, total_items))
 
685
        self._log_copied_texts()
 
686
 
 
687
    def _check_references(self):
 
688
        """Make sure our external refereneces are present."""
 
689
        external_refs = self.new_pack._external_compression_parents_of_texts()
 
690
        if external_refs:
 
691
            index = self._pack_collection.text_index.combined_index
 
692
            found_items = list(index.iter_entries(external_refs))
 
693
            if len(found_items) != len(external_refs):
 
694
                found_keys = set(k for idx, k, refs, value in found_items)
 
695
                missing_items = external_refs - found_keys
 
696
                missing_file_id, missing_revision_id = missing_items.pop()
 
697
                raise errors.RevisionNotPresent(missing_revision_id,
 
698
                                                missing_file_id)
 
699
 
 
700
    def _create_pack_from_packs(self):
 
701
        self.pb.update("Opening pack", 0, 5)
 
702
        self.new_pack = self.open_pack()
 
703
        new_pack = self.new_pack
 
704
        # buffer data - we won't be reading-back during the pack creation and
 
705
        # this makes a significant difference on sftp pushes.
 
706
        new_pack.set_write_cache_size(1024*1024)
628
707
        if 'pack' in debug.debug_flags:
629
 
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
 
708
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
 
709
                for a_pack in self.packs]
 
710
            if self.revision_ids is not None:
 
711
                rev_count = len(self.revision_ids)
 
712
            else:
 
713
                rev_count = 'all'
 
714
            mutter('%s: create_pack: creating pack from source packs: '
 
715
                '%s%s %s revisions wanted %s t=0',
630
716
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
631
 
                new_pack.text_index.key_count(),
632
 
                time.time() - new_pack.start_time)
 
717
                plain_pack_list, rev_count)
 
718
        self._copy_revision_texts()
 
719
        self._copy_inventory_texts()
 
720
        self._copy_text_texts()
633
721
        # select signature keys
634
 
        signature_filter = revision_keys # same keyspace
 
722
        signature_filter = self._revision_keys # same keyspace
635
723
        signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
636
724
            self.packs, 'signature_index')[0]
637
725
        signature_nodes = self._pack_collection._index_contents(signature_index_map,
645
733
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
646
734
                new_pack.signature_index.key_count(),
647
735
                time.time() - new_pack.start_time)
648
 
        if not new_pack.data_inserted():
 
736
        self._check_references()
 
737
        if not self._use_pack(new_pack):
649
738
            new_pack.abort()
650
739
            return None
651
740
        self.pb.update("Finishing pack", 5)
703
792
                pb.update("Copied record", record_index)
704
793
                record_index += 1
705
794
 
706
 
    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
707
 
        output_lines=False):
 
795
    def _copy_nodes_graph(self, index_map, writer, write_index,
 
796
        readv_group_iter, total_items, output_lines=False):
708
797
        """Copy knit nodes between packs.
709
798
 
710
799
        :param output_lines: Return lines present in the copied data as
712
801
        """
713
802
        pb = ui.ui_factory.nested_progress_bar()
714
803
        try:
715
 
            return self._do_copy_nodes_graph(nodes, index_map, writer,
716
 
                write_index, output_lines, pb)
717
 
        finally:
 
804
            for result in self._do_copy_nodes_graph(index_map, writer,
 
805
                write_index, output_lines, pb, readv_group_iter, total_items):
 
806
                yield result
 
807
        except Exception:
 
808
            # Python 2.4 does not permit try:finally: in a generator.
 
809
            pb.finished()
 
810
            raise
 
811
        else:
718
812
            pb.finished()
719
813
 
720
 
    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
721
 
        output_lines, pb):
 
814
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
 
815
        output_lines, pb, readv_group_iter, total_items):
722
816
        # for record verification
723
817
        knit_data = _KnitData(None)
724
818
        # for line extraction when requested (inventories only)
725
819
        if output_lines:
726
820
            factory = knit.KnitPlainFactory()
727
 
        # plan a readv on each source pack:
728
 
        # group by pack
729
 
        nodes = sorted(nodes)
730
 
        # how to map this into knit.py - or knit.py into this?
731
 
        # we don't want the typical knit logic, we want grouping by pack
732
 
        # at this point - perhaps a helper library for the following code 
733
 
        # duplication points?
734
 
        request_groups = {}
735
821
        record_index = 0
736
 
        pb.update("Copied record", record_index, len(nodes))
737
 
        for index, key, value, references in nodes:
738
 
            if index not in request_groups:
739
 
                request_groups[index] = []
740
 
            request_groups[index].append((key, value, references))
741
 
        for index, items in request_groups.iteritems():
742
 
            pack_readv_requests = []
743
 
            for key, value, references in items:
744
 
                # ---- KnitGraphIndex.get_position
745
 
                bits = value[1:].split(' ')
746
 
                offset, length = int(bits[0]), int(bits[1])
747
 
                pack_readv_requests.append((offset, length, (key, value[0], references)))
748
 
            # linear scan up the pack
749
 
            pack_readv_requests.sort()
 
822
        pb.update("Copied record", record_index, total_items)
 
823
        for index, readv_vector, node_vector in readv_group_iter:
750
824
            # copy the data
751
825
            transport, path = index_map[index]
752
 
            reader = pack.make_readv_reader(transport, path,
753
 
                [offset[0:2] for offset in pack_readv_requests])
754
 
            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
755
 
                izip(reader.iter_records(), pack_readv_requests):
 
826
            reader = pack.make_readv_reader(transport, path, readv_vector)
 
827
            for (names, read_func), (key, eol_flag, references) in \
 
828
                izip(reader.iter_records(), node_vector):
756
829
                raw_data = read_func(None)
757
830
                version_id = key[-1]
758
831
                if output_lines:
773
846
                pb.update("Copied record", record_index)
774
847
                record_index += 1
775
848
 
 
849
    def _get_text_nodes(self):
 
850
        text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
 
851
            self.packs, 'text_index')[0]
 
852
        return text_index_map, self._pack_collection._index_contents(text_index_map,
 
853
            self._text_filter)
 
854
 
 
855
    def _least_readv_node_readv(self, nodes):
 
856
        """Generate request groups for nodes using the least readv's.
 
857
        
 
858
        :param nodes: An iterable of graph index nodes.
 
859
        :return: Total node count and an iterator of the data needed to perform
 
860
            readvs to obtain the data for nodes. Each item yielded by the
 
861
            iterator is a tuple with:
 
862
            index, readv_vector, node_vector. readv_vector is a list ready to
 
863
            hand to the transport readv method, and node_vector is a list of
 
864
            (key, eol_flag, references) for the the node retrieved by the
 
865
            matching readv_vector.
 
866
        """
 
867
        # group by pack so we do one readv per pack
 
868
        nodes = sorted(nodes)
 
869
        total = len(nodes)
 
870
        request_groups = {}
 
871
        for index, key, value, references in nodes:
 
872
            if index not in request_groups:
 
873
                request_groups[index] = []
 
874
            request_groups[index].append((key, value, references))
 
875
        result = []
 
876
        for index, items in request_groups.iteritems():
 
877
            pack_readv_requests = []
 
878
            for key, value, references in items:
 
879
                # ---- KnitGraphIndex.get_position
 
880
                bits = value[1:].split(' ')
 
881
                offset, length = int(bits[0]), int(bits[1])
 
882
                pack_readv_requests.append(
 
883
                    ((offset, length), (key, value[0], references)))
 
884
            # linear scan up the pack to maximum range combining.
 
885
            pack_readv_requests.sort()
 
886
            # split out the readv and the node data.
 
887
            pack_readv = [readv for readv, node in pack_readv_requests]
 
888
            node_vector = [node for readv, node in pack_readv_requests]
 
889
            result.append((index, pack_readv, node_vector))
 
890
        return total, result
 
891
 
 
892
    def _log_copied_texts(self):
 
893
        if 'pack' in debug.debug_flags:
 
894
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
 
895
                time.ctime(), self._pack_collection._upload_transport.base,
 
896
                self.new_pack.random_name,
 
897
                self.new_pack.text_index.key_count(),
 
898
                time.time() - self.new_pack.start_time)
 
899
 
 
900
    def _process_inventory_lines(self, inv_lines):
 
901
        """Use up the inv_lines generator and setup a text key filter."""
 
902
        repo = self._pack_collection.repo
 
903
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
 
904
            inv_lines, self.revision_ids)
 
905
        text_filter = []
 
906
        for fileid, file_revids in fileid_revisions.iteritems():
 
907
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
 
908
        self._text_filter = text_filter
 
909
 
 
910
    def _revision_node_readv(self, revision_nodes):
 
911
        """Return the total revisions and the readv's to issue.
 
912
 
 
913
        :param revision_nodes: The revision index contents for the packs being
 
914
            incorporated into the new pack.
 
915
        :return: As per _least_readv_node_readv.
 
916
        """
 
917
        return self._least_readv_node_readv(revision_nodes)
 
918
 
 
919
    def _use_pack(self, new_pack):
 
920
        """Return True if new_pack should be used.
 
921
 
 
922
        :param new_pack: The pack that has just been created.
 
923
        :return: True if the pack should be used.
 
924
        """
 
925
        return new_pack.data_inserted()
 
926
 
 
927
 
 
928
class OptimisingPacker(Packer):
 
929
    """A packer which spends more time to create better disk layouts."""
 
930
 
 
931
    def _revision_node_readv(self, revision_nodes):
 
932
        """Return the total revisions and the readv's to issue.
 
933
 
 
934
        This sort places revisions in topological order with the ancestors
 
935
        after the children.
 
936
 
 
937
        :param revision_nodes: The revision index contents for the packs being
 
938
            incorporated into the new pack.
 
939
        :return: As per _least_readv_node_readv.
 
940
        """
 
941
        # build an ancestors dict
 
942
        ancestors = {}
 
943
        by_key = {}
 
944
        for index, key, value, references in revision_nodes:
 
945
            ancestors[key] = references[0]
 
946
            by_key[key] = (index, value, references)
 
947
        order = tsort.topo_sort(ancestors)
 
948
        total = len(order)
 
949
        # Single IO is pathological, but it will work as a starting point.
 
950
        requests = []
 
951
        for key in reversed(order):
 
952
            index, value, references = by_key[key]
 
953
            # ---- KnitGraphIndex.get_position
 
954
            bits = value[1:].split(' ')
 
955
            offset, length = int(bits[0]), int(bits[1])
 
956
            requests.append(
 
957
                (index, [(offset, length)], [(key, value[0], references)]))
 
958
        # TODO: combine requests in the same index that are in ascending order.
 
959
        return total, requests
 
960
 
776
961
 
777
962
class ReconcilePacker(Packer):
778
963
    """A packer which regenerates indices etc as it copies.
781
966
    regenerated.
782
967
    """
783
968
 
 
969
    def _extra_init(self):
 
970
        self._data_changed = False
 
971
 
 
972
    def _process_inventory_lines(self, inv_lines):
 
973
        """Generate a text key reference map rather for reconciling with."""
 
974
        repo = self._pack_collection.repo
 
975
        refs = repo._find_text_key_references_from_xml_inventory_lines(
 
976
            inv_lines)
 
977
        self._text_refs = refs
 
978
        # during reconcile we:
 
979
        #  - convert unreferenced texts to full texts
 
980
        #  - correct texts which reference a text not copied to be full texts
 
981
        #  - copy all others as-is but with corrected parents.
 
982
        #  - so at this point we don't know enough to decide what becomes a full
 
983
        #    text.
 
984
        self._text_filter = None
 
985
 
 
986
    def _copy_text_texts(self):
 
987
        """generate what texts we should have and then copy."""
 
988
        self.pb.update("Copying content texts", 3)
 
989
        # we have three major tasks here:
 
990
        # 1) generate the ideal index
 
991
        repo = self._pack_collection.repo
 
992
        ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
 
993
            _1, key, _2, refs in 
 
994
            self.new_pack.revision_index.iter_all_entries()])
 
995
        ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
 
996
        # 2) generate a text_nodes list that contains all the deltas that can
 
997
        #    be used as-is, with corrected parents.
 
998
        ok_nodes = []
 
999
        bad_texts = []
 
1000
        discarded_nodes = []
 
1001
        NULL_REVISION = _mod_revision.NULL_REVISION
 
1002
        text_index_map, text_nodes = self._get_text_nodes()
 
1003
        for node in text_nodes:
 
1004
            # 0 - index
 
1005
            # 1 - key 
 
1006
            # 2 - value
 
1007
            # 3 - refs
 
1008
            try:
 
1009
                ideal_parents = tuple(ideal_index[node[1]])
 
1010
            except KeyError:
 
1011
                discarded_nodes.append(node)
 
1012
                self._data_changed = True
 
1013
            else:
 
1014
                if ideal_parents == (NULL_REVISION,):
 
1015
                    ideal_parents = ()
 
1016
                if ideal_parents == node[3][0]:
 
1017
                    # no change needed.
 
1018
                    ok_nodes.append(node)
 
1019
                elif ideal_parents[0:1] == node[3][0][0:1]:
 
1020
                    # the left most parent is the same, or there are no parents
 
1021
                    # today. Either way, we can preserve the representation as
 
1022
                    # long as we change the refs to be inserted.
 
1023
                    self._data_changed = True
 
1024
                    ok_nodes.append((node[0], node[1], node[2],
 
1025
                        (ideal_parents, node[3][1])))
 
1026
                    self._data_changed = True
 
1027
                else:
 
1028
                    # Reinsert this text completely
 
1029
                    bad_texts.append((node[1], ideal_parents))
 
1030
                    self._data_changed = True
 
1031
        # we're finished with some data.
 
1032
        del ideal_index
 
1033
        del text_nodes
 
1034
        # 3) bulk copy the ok data
 
1035
        total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
 
1036
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
1037
            self.new_pack.text_index, readv_group_iter, total_items))
 
1038
        # 4) adhoc copy all the other texts.
 
1039
        # We have to topologically insert all texts otherwise we can fail to
 
1040
        # reconcile when parts of a single delta chain are preserved intact,
 
1041
        # and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
 
1042
        # reinserted, and if d3 has incorrect parents it will also be
 
1043
        # reinserted. If we insert d3 first, d2 is present (as it was bulk
 
1044
        # copied), so we will try to delta, but d2 is not currently able to be
 
1045
        # extracted because it's basis d1 is not present. Topologically sorting
 
1046
        # addresses this. The following generates a sort for all the texts that
 
1047
        # are being inserted without having to reference the entire text key
 
1048
        # space (we only topo sort the revisions, which is smaller).
 
1049
        topo_order = tsort.topo_sort(ancestors)
 
1050
        rev_order = dict(zip(topo_order, range(len(topo_order))))
 
1051
        bad_texts.sort(key=lambda key:rev_order[key[0][1]])
 
1052
        transaction = repo.get_transaction()
 
1053
        file_id_index = GraphIndexPrefixAdapter(
 
1054
            self.new_pack.text_index,
 
1055
            ('blank', ), 1,
 
1056
            add_nodes_callback=self.new_pack.text_index.add_nodes)
 
1057
        knit_index = KnitGraphIndex(file_id_index,
 
1058
            add_callback=file_id_index.add_nodes,
 
1059
            deltas=True, parents=True)
 
1060
        output_knit = knit.KnitVersionedFile('reconcile-texts',
 
1061
            self._pack_collection.transport,
 
1062
            None,
 
1063
            index=knit_index,
 
1064
            access_method=_PackAccess(
 
1065
                {self.new_pack.text_index:self.new_pack.access_tuple()},
 
1066
                (self.new_pack._writer, self.new_pack.text_index)),
 
1067
            factory=knit.KnitPlainFactory())
 
1068
        for key, parent_keys in bad_texts:
 
1069
            # We refer to the new pack to delta data being output.
 
1070
            # A possible improvement would be to catch errors on short reads
 
1071
            # and only flush then.
 
1072
            self.new_pack.flush()
 
1073
            parents = []
 
1074
            for parent_key in parent_keys:
 
1075
                if parent_key[0] != key[0]:
 
1076
                    # Graph parents must match the fileid
 
1077
                    raise errors.BzrError('Mismatched key parent %r:%r' %
 
1078
                        (key, parent_keys))
 
1079
                parents.append(parent_key[1])
 
1080
            source_weave = repo.weave_store.get_weave(key[0], transaction)
 
1081
            text_lines = source_weave.get_lines(key[1])
 
1082
            # adapt the 'knit' to the current file_id.
 
1083
            file_id_index = GraphIndexPrefixAdapter(
 
1084
                self.new_pack.text_index,
 
1085
                (key[0], ), 1,
 
1086
                add_nodes_callback=self.new_pack.text_index.add_nodes)
 
1087
            knit_index._graph_index = file_id_index
 
1088
            knit_index._add_callback = file_id_index.add_nodes
 
1089
            output_knit.add_lines_with_ghosts(
 
1090
                key[1], parents, text_lines, random_id=True, check_content=False)
 
1091
        # 5) check that nothing inserted has a reference outside the keyspace.
 
1092
        missing_text_keys = self.new_pack._external_compression_parents_of_texts()
 
1093
        if missing_text_keys:
 
1094
            raise errors.BzrError('Reference to missing compression parents %r'
 
1095
                % (refs - keys,))
 
1096
        self._log_copied_texts()
 
1097
 
 
1098
    def _use_pack(self, new_pack):
 
1099
        """Override _use_pack to check for reconcile having changed content."""
 
1100
        # XXX: we might be better checking this at the copy time.
 
1101
        original_inventory_keys = set()
 
1102
        inv_index = self._pack_collection.inventory_index.combined_index
 
1103
        for entry in inv_index.iter_all_entries():
 
1104
            original_inventory_keys.add(entry[1])
 
1105
        new_inventory_keys = set()
 
1106
        for entry in new_pack.inventory_index.iter_all_entries():
 
1107
            new_inventory_keys.add(entry[1])
 
1108
        if new_inventory_keys != original_inventory_keys:
 
1109
            self._data_changed = True
 
1110
        return new_pack.data_inserted() and self._data_changed
 
1111
 
784
1112
 
785
1113
class RepositoryPackCollection(object):
786
1114
    """Management of packs within a repository."""
901
1229
        self._execute_pack_operations(pack_operations)
902
1230
        return True
903
1231
 
904
 
    def _execute_pack_operations(self, pack_operations):
 
1232
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
905
1233
        """Execute a series of pack operations.
906
1234
 
907
1235
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
1236
        :param _packer_class: The class of packer to use (default: Packer).
908
1237
        :return: None.
909
1238
        """
910
1239
        for revision_count, packs in pack_operations:
911
1240
            # we may have no-ops from the setup logic
912
1241
            if len(packs) == 0:
913
1242
                continue
914
 
            Packer(self, packs, '.autopack').pack()
 
1243
            _packer_class(self, packs, '.autopack').pack()
915
1244
            for pack in packs:
916
1245
                self._remove_pack_from_memory(pack)
917
1246
        # record the newly available packs and stop advertising the old
934
1263
        self.ensure_loaded()
935
1264
        total_packs = len(self._names)
936
1265
        if total_packs < 2:
 
1266
            # This is arguably wrong because we might not be optimal, but for
 
1267
            # now lets leave it in. (e.g. reconcile -> one pack. But not
 
1268
            # optimal.
937
1269
            return
938
1270
        total_revisions = self.revision_index.combined_index.key_count()
939
1271
        # XXX: the following may want to be a class, to pack with a given
945
1277
        pack_distribution = [1]
946
1278
        pack_operations = [[0, []]]
947
1279
        for pack in self.all_packs():
948
 
            revision_count = pack.get_revision_count()
949
 
            pack_operations[-1][0] += revision_count
 
1280
            pack_operations[-1][0] += pack.get_revision_count()
950
1281
            pack_operations[-1][1].append(pack)
951
 
        self._execute_pack_operations(pack_operations)
 
1282
        self._execute_pack_operations(pack_operations, OptimisingPacker)
952
1283
 
953
1284
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
954
1285
        """Plan a pack operation.
995
1326
    def ensure_loaded(self):
996
1327
        # NB: if you see an assertion error here, its probably access against
997
1328
        # an unlocked repo. Naughty.
998
 
        assert self.repo.is_locked()
 
1329
        if not self.repo.is_locked():
 
1330
            raise errors.ObjectNotLocked(self.repo)
999
1331
        if self._names is None:
1000
1332
            self._names = {}
1001
1333
            self._packs_at_load = set()
1036
1368
        """
1037
1369
        self.ensure_loaded()
1038
1370
        if a_new_pack.name in self._names:
1039
 
            # a collision with the packs we know about (not the only possible
1040
 
            # collision, see NewPack.finish() for some discussion). Remove our
1041
 
            # prior reference to it.
1042
 
            self._remove_pack_from_memory(a_new_pack)
 
1371
            raise errors.BzrError(
 
1372
                'Pack %r already exists in %s' % (a_new_pack.name, self))
1043
1373
        self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
1044
1374
        self.add_pack_to_memory(a_new_pack)
1045
1375
 
1245
1575
            # changing it.
1246
1576
            for key, value in disk_nodes:
1247
1577
                builder.add_node(key, value)
1248
 
            self.transport.put_file('pack-names', builder.finish())
 
1578
            self.transport.put_file('pack-names', builder.finish(),
 
1579
                mode=self.repo.control_files._file_mode)
1249
1580
            # move the baseline forward
1250
1581
            self._packs_at_load = disk_nodes
1251
1582
            # now clear out the obsolete packs directory
1288
1619
        if not self.repo.is_write_locked():
1289
1620
            raise errors.NotWriteLocked(self)
1290
1621
        self._new_pack = NewPack(self._upload_transport, self._index_transport,
1291
 
            self._pack_transport, upload_suffix='.pack')
 
1622
            self._pack_transport, upload_suffix='.pack',
 
1623
            file_mode=self.repo.control_files._file_mode)
1292
1624
        # allow writing: queue writes to a new index
1293
1625
        self.revision_index.add_writable_index(self._new_pack.revision_index,
1294
1626
            self._new_pack)
1316
1648
    def _abort_write_group(self):
1317
1649
        # FIXME: just drop the transient index.
1318
1650
        # forget what names there are
1319
 
        self._new_pack.abort()
1320
 
        self._remove_pack_indices(self._new_pack)
1321
 
        self._new_pack = None
 
1651
        if self._new_pack is not None:
 
1652
            self._new_pack.abort()
 
1653
            self._remove_pack_indices(self._new_pack)
 
1654
            self._new_pack = None
1322
1655
        self.repo._text_knit = None
1323
1656
 
1324
1657
    def _commit_write_group(self):
1508
1841
        self._transaction = None
1509
1842
        # for tests
1510
1843
        self._reconcile_does_inventory_gc = True
1511
 
        self._reconcile_fixes_text_parents = False
 
1844
        self._reconcile_fixes_text_parents = True
1512
1845
        self._reconcile_backsup_inventory = False
1513
1846
 
1514
1847
    def _abort_write_group(self):
1529
1862
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
1530
1863
            parents-in-revision).
1531
1864
        """
1532
 
        assert self.is_locked()
 
1865
        if not self.is_locked():
 
1866
            raise errors.ObjectNotLocked(self)
1533
1867
        pb = ui.ui_factory.nested_progress_bar()
1534
1868
        result = []
1535
1869
        try:
1562
1896
            pb.finished()
1563
1897
        return result
1564
1898
 
 
1899
    @symbol_versioning.deprecated_method(symbol_versioning.one_one)
1565
1900
    def get_parents(self, revision_ids):
1566
 
        """See StackedParentsProvider.get_parents.
1567
 
        
 
1901
        """See graph._StackedParentsProvider.get_parents."""
 
1902
        parent_map = self.get_parent_map(revision_ids)
 
1903
        return [parent_map.get(r, None) for r in revision_ids]
 
1904
 
 
1905
    def get_parent_map(self, keys):
 
1906
        """See graph._StackedParentsProvider.get_parent_map
 
1907
 
1568
1908
        This implementation accesses the combined revision index to provide
1569
1909
        answers.
1570
1910
        """
1571
1911
        self._pack_collection.ensure_loaded()
1572
1912
        index = self._pack_collection.revision_index.combined_index
1573
 
        search_keys = set()
1574
 
        for revision_id in revision_ids:
1575
 
            if revision_id != _mod_revision.NULL_REVISION:
1576
 
                search_keys.add((revision_id,))
1577
 
        found_parents = {_mod_revision.NULL_REVISION:[]}
 
1913
        keys = set(keys)
 
1914
        if _mod_revision.NULL_REVISION in keys:
 
1915
            keys.discard(_mod_revision.NULL_REVISION)
 
1916
            found_parents = {_mod_revision.NULL_REVISION:()}
 
1917
        else:
 
1918
            found_parents = {}
 
1919
        search_keys = set((revision_id,) for revision_id in keys)
1578
1920
        for index, key, value, refs in index.iter_entries(search_keys):
1579
1921
            parents = refs[0]
1580
1922
            if not parents:
1582
1924
            else:
1583
1925
                parents = tuple(parent[0] for parent in parents)
1584
1926
            found_parents[key[0]] = parents
1585
 
        result = []
1586
 
        for revision_id in revision_ids:
1587
 
            try:
1588
 
                result.append(found_parents[revision_id])
1589
 
            except KeyError:
1590
 
                result.append(None)
 
1927
        return found_parents
 
1928
 
 
1929
    def has_revisions(self, revision_ids):
 
1930
        """See Repository.has_revisions()."""
 
1931
        revision_ids = set(revision_ids)
 
1932
        result = revision_ids.intersection(
 
1933
            set([None, _mod_revision.NULL_REVISION]))
 
1934
        revision_ids.difference_update(result)
 
1935
        index = self._pack_collection.revision_index.combined_index
 
1936
        keys = [(revision_id,) for revision_id in revision_ids]
 
1937
        result.update(node[1][0] for node in index.iter_entries(keys))
1591
1938
        return result
1592
1939
 
1593
1940
    def _make_parents_provider(self):
1594
 
        return self
 
1941
        return graph.CachingParentsProvider(self)
1595
1942
 
1596
1943
    def _refresh_data(self):
1597
1944
        if self._write_lock_count == 1 or (
1794
2141
 
1795
2142
 
1796
2143
class RepositoryFormatKnitPack1(RepositoryFormatPack):
1797
 
    """A no-subtrees parameterised Pack repository.
 
2144
    """A no-subtrees parameterized Pack repository.
1798
2145
 
1799
2146
    This format was introduced in 0.92.
1800
2147
    """
1804
2151
    _serializer = xml5.serializer_v5
1805
2152
 
1806
2153
    def _get_matching_bzrdir(self):
1807
 
        return bzrdir.format_registry.make_bzrdir('knitpack-experimental')
 
2154
        return bzrdir.format_registry.make_bzrdir('pack-0.92')
1808
2155
 
1809
2156
    def _ignore_setting_bzrdir(self, format):
1810
2157
        pass
1824
2171
 
1825
2172
 
1826
2173
class RepositoryFormatKnitPack3(RepositoryFormatPack):
1827
 
    """A subtrees parameterised Pack repository.
 
2174
    """A subtrees parameterized Pack repository.
1828
2175
 
1829
2176
    This repository format uses the xml7 serializer to get:
1830
2177
     - support for recording full info about the tree root
1841
2188
 
1842
2189
    def _get_matching_bzrdir(self):
1843
2190
        return bzrdir.format_registry.make_bzrdir(
1844
 
            'knitpack-subtree-experimental')
 
2191
            'pack-0.92-subtree')
1845
2192
 
1846
2193
    def _ignore_setting_bzrdir(self, format):
1847
2194
        pass
1863
2210
    def get_format_description(self):
1864
2211
        """See RepositoryFormat.get_format_description()."""
1865
2212
        return "Packs containing knits with subtree support\n"
 
2213
 
 
2214
 
 
2215
class RepositoryFormatKnitPack4(RepositoryFormatPack):
 
2216
    """A rich-root, no subtrees parameterized Pack repository.
 
2217
 
 
2218
    This repository format uses the xml6 serializer to get:
 
2219
     - support for recording full info about the tree root
 
2220
 
 
2221
    This format was introduced in 1.0.
 
2222
    """
 
2223
 
 
2224
    repository_class = KnitPackRepository
 
2225
    _commit_builder_class = PackRootCommitBuilder
 
2226
    rich_root_data = True
 
2227
    supports_tree_reference = False
 
2228
    _serializer = xml6.serializer_v6
 
2229
 
 
2230
    def _get_matching_bzrdir(self):
 
2231
        return bzrdir.format_registry.make_bzrdir(
 
2232
            'rich-root-pack')
 
2233
 
 
2234
    def _ignore_setting_bzrdir(self, format):
 
2235
        pass
 
2236
 
 
2237
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
2238
 
 
2239
    def check_conversion_target(self, target_format):
 
2240
        if not target_format.rich_root_data:
 
2241
            raise errors.BadConversionTarget(
 
2242
                'Does not support rich root data.', target_format)
 
2243
 
 
2244
    def get_format_string(self):
 
2245
        """See RepositoryFormat.get_format_string()."""
 
2246
        return ("Bazaar pack repository format 1 with rich root"
 
2247
                " (needs bzr 1.0)\n")
 
2248
 
 
2249
    def get_format_description(self):
 
2250
        """See RepositoryFormat.get_format_description()."""
 
2251
        return "Packs containing knits with rich root support\n"