/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Lukáš Lalinský
  • Date: 2007-12-17 17:28:25 UTC
  • mfrom: (3120 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3123.
  • Revision ID: lalinsky@gmail.com-20071217172825-tr3pqm1mhvs3gwnn
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
        pack,
27
27
        ui,
28
28
        )
 
29
from bzrlib.graph import Graph
29
30
from bzrlib.index import (
30
31
    GraphIndex,
31
32
    GraphIndexBuilder,
37
38
from bzrlib.osutils import rand_chars
38
39
from bzrlib.pack import ContainerWriter
39
40
from bzrlib.store import revision
 
41
from bzrlib import tsort
40
42
""")
41
43
from bzrlib import (
42
44
    bzrdir,
48
50
    osutils,
49
51
    transactions,
50
52
    xml5,
 
53
    xml6,
51
54
    xml7,
52
55
    )
53
56
 
72
75
    added text, reducing memory and object pressure.
73
76
    """
74
77
 
 
78
    def __init__(self, repository, parents, config, timestamp=None,
 
79
                 timezone=None, committer=None, revprops=None,
 
80
                 revision_id=None):
 
81
        CommitBuilder.__init__(self, repository, parents, config,
 
82
            timestamp=timestamp, timezone=timezone, committer=committer,
 
83
            revprops=revprops, revision_id=revision_id)
 
84
        self._file_graph = Graph(
 
85
            repository._pack_collection.text_index.combined_index)
 
86
 
75
87
    def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
76
88
        return self.repository._pack_collection._add_text_to_weave(file_id,
77
89
            self._new_revision_id, new_lines, parents, nostore_sha,
78
90
            self.random_revid)
79
91
 
 
92
    def _heads(self, file_id, revision_ids):
 
93
        keys = [(file_id, revision_id) for revision_id in revision_ids]
 
94
        return set([key[1] for key in self._file_graph.heads(keys)])
 
95
 
80
96
 
81
97
class PackRootCommitBuilder(RootCommitBuilder):
82
98
    """A subclass of RootCommitBuilder to add texts with pack semantics.
85
101
    added text, reducing memory and object pressure.
86
102
    """
87
103
 
 
104
    def __init__(self, repository, parents, config, timestamp=None,
 
105
                 timezone=None, committer=None, revprops=None,
 
106
                 revision_id=None):
 
107
        CommitBuilder.__init__(self, repository, parents, config,
 
108
            timestamp=timestamp, timezone=timezone, committer=committer,
 
109
            revprops=revprops, revision_id=revision_id)
 
110
        self._file_graph = Graph(
 
111
            repository._pack_collection.text_index.combined_index)
 
112
 
88
113
    def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
89
114
        return self.repository._pack_collection._add_text_to_weave(file_id,
90
115
            self._new_revision_id, new_lines, parents, nostore_sha,
91
116
            self.random_revid)
92
117
 
 
118
    def _heads(self, file_id, revision_ids):
 
119
        keys = [(file_id, revision_id) for revision_id in revision_ids]
 
120
        return set([key[1] for key in self._file_graph.heads(keys)])
 
121
 
93
122
 
94
123
class Pack(object):
95
124
    """An in memory proxy for a pack and its indices.
145
174
        """The text index is the name + .tix."""
146
175
        return self.index_name('text', name)
147
176
 
 
177
    def _external_compression_parents_of_texts(self):
 
178
        keys = set()
 
179
        refs = set()
 
180
        for node in self.text_index.iter_all_entries():
 
181
            keys.add(node[1])
 
182
            refs.update(node[3][1])
 
183
        return refs - keys
 
184
 
148
185
 
149
186
class ExistingPack(Pack):
150
187
    """An in memory proxy for an existing .pack and its disk indices."""
187
224
        }
188
225
 
189
226
    def __init__(self, upload_transport, index_transport, pack_transport,
190
 
        upload_suffix=''):
 
227
        upload_suffix='', file_mode=None):
191
228
        """Create a NewPack instance.
192
229
 
193
230
        :param upload_transport: A writable transport for the pack to be
199
236
            upload_transport.clone('../packs').
200
237
        :param upload_suffix: An optional suffix to be given to any temporary
201
238
            files created during the pack creation. e.g '.autopack'
 
239
        :param file_mode: An optional file mode to create the new files with.
202
240
        """
203
241
        # The relative locations of the packs are constrained, but all are
204
242
        # passed in because the caller has them, so as to avoid object churn.
223
261
        self.index_transport = index_transport
224
262
        # where is the pack renamed to when it is finished?
225
263
        self.pack_transport = pack_transport
 
264
        # What file mode to upload the pack and indices with.
 
265
        self._file_mode = file_mode
226
266
        # tracks the content written to the .pack file.
227
267
        self._hash = md5.new()
228
268
        # a four-tuple with the length in bytes of the indices, once the pack
239
279
        self.start_time = time.time()
240
280
        # open an output stream for the data added to the pack.
241
281
        self.write_stream = self.upload_transport.open_write_stream(
242
 
            self.random_name)
 
282
            self.random_name, mode=self._file_mode)
243
283
        if 'pack' in debug.debug_flags:
244
284
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
245
285
                time.ctime(), self.upload_transport.base, self.random_name,
342
382
                self.pack_transport, self.name,
343
383
                time.time() - self.start_time)
344
384
 
 
385
    def flush(self):
 
386
        """Flush any current data."""
 
387
        if self._buffer[1]:
 
388
            bytes = ''.join(self._buffer[0])
 
389
            self.write_stream.write(bytes)
 
390
            self._hash.update(bytes)
 
391
            self._buffer[:] = [[], 0]
 
392
 
345
393
    def index_name(self, index_type, name):
346
394
        """Get the disk name of an index type for pack name 'name'."""
347
395
        return name + NewPack.index_definitions[index_type][0]
368
416
        """
369
417
        index_name = self.index_name(index_type, self.name)
370
418
        self.index_sizes[self.index_offset(index_type)] = \
371
 
            self.index_transport.put_file(index_name, index.finish())
 
419
            self.index_transport.put_file(index_name, index.finish(),
 
420
            mode=self._file_mode)
372
421
        if 'pack' in debug.debug_flags:
373
422
            # XXX: size might be interesting?
374
423
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
490
539
        self.packs = packs
491
540
        self.suffix = suffix
492
541
        self.revision_ids = revision_ids
 
542
        # The pack object we are creating.
 
543
        self.new_pack = None
493
544
        self._pack_collection = pack_collection
 
545
        # The index layer keys for the revisions being copied. None for 'all
 
546
        # objects'.
 
547
        self._revision_keys = None
 
548
        # What text keys to copy. None for 'all texts'. This is set by
 
549
        # _copy_inventory_texts
 
550
        self._text_filter = None
 
551
        self._extra_init()
 
552
 
 
553
    def _extra_init(self):
 
554
        """A template hook to allow extending the constructor trivially."""
494
555
 
495
556
    def pack(self, pb=None):
496
557
        """Create a new pack by reading data from other packs.
533
594
        """Open a pack for the pack we are creating."""
534
595
        return NewPack(self._pack_collection._upload_transport,
535
596
            self._pack_collection._index_transport,
536
 
            self._pack_collection._pack_transport, upload_suffix=self.suffix)
 
597
            self._pack_collection._pack_transport, upload_suffix=self.suffix,
 
598
            file_mode=self._pack_collection.repo.control_files._file_mode)
537
599
 
538
 
    def _create_pack_from_packs(self):
539
 
        self.pb.update("Opening pack", 0, 5)
540
 
        new_pack = self.open_pack()
541
 
        # buffer data - we won't be reading-back during the pack creation and
542
 
        # this makes a significant difference on sftp pushes.
543
 
        new_pack.set_write_cache_size(1024*1024)
544
 
        if 'pack' in debug.debug_flags:
545
 
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
546
 
                for a_pack in self.packs]
547
 
            if self.revision_ids is not None:
548
 
                rev_count = len(self.revision_ids)
549
 
            else:
550
 
                rev_count = 'all'
551
 
            mutter('%s: create_pack: creating pack from source packs: '
552
 
                '%s%s %s revisions wanted %s t=0',
553
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
554
 
                plain_pack_list, rev_count)
 
600
    def _copy_revision_texts(self):
 
601
        """Copy revision data to the new pack."""
555
602
        # select revisions
556
603
        if self.revision_ids:
557
604
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
558
605
        else:
559
606
            revision_keys = None
560
 
 
561
607
        # select revision keys
562
608
        revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
563
609
            self.packs, 'revision_index')[0]
564
610
        revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
565
611
        # copy revision keys and adjust values
566
612
        self.pb.update("Copying revision texts", 1)
567
 
        list(self._copy_nodes_graph(revision_nodes, revision_index_map,
568
 
            new_pack._writer, new_pack.revision_index))
 
613
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
 
614
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
 
615
            self.new_pack.revision_index, readv_group_iter, total_items))
569
616
        if 'pack' in debug.debug_flags:
570
617
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
571
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
572
 
                new_pack.revision_index.key_count(),
573
 
                time.time() - new_pack.start_time)
 
618
                time.ctime(), self._pack_collection._upload_transport.base,
 
619
                self.new_pack.random_name,
 
620
                self.new_pack.revision_index.key_count(),
 
621
                time.time() - self.new_pack.start_time)
 
622
        self._revision_keys = revision_keys
 
623
 
 
624
    def _copy_inventory_texts(self):
 
625
        """Copy the inventory texts to the new pack.
 
626
 
 
627
        self._revision_keys is used to determine what inventories to copy.
 
628
 
 
629
        Sets self._text_filter appropriately.
 
630
        """
574
631
        # select inventory keys
575
 
        inv_keys = revision_keys # currently the same keyspace, and note that
 
632
        inv_keys = self._revision_keys # currently the same keyspace, and note that
576
633
        # querying for keys here could introduce a bug where an inventory item
577
634
        # is missed, so do not change it to query separately without cross
578
635
        # checking like the text key check below.
583
640
        # XXX: Should be a helper function to allow different inv representation
584
641
        # at this point.
585
642
        self.pb.update("Copying inventory texts", 2)
586
 
        inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
587
 
            new_pack._writer, new_pack.inventory_index, output_lines=True)
 
643
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
 
644
        inv_lines = self._copy_nodes_graph(inventory_index_map,
 
645
            self.new_pack._writer, self.new_pack.inventory_index,
 
646
            readv_group_iter, total_items, output_lines=True)
588
647
        if self.revision_ids:
589
 
            fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
590
 
                inv_lines, self.revision_ids)
591
 
            text_filter = []
592
 
            for fileid, file_revids in fileid_revisions.iteritems():
593
 
                text_filter.extend(
594
 
                    [(fileid, file_revid) for file_revid in file_revids])
 
648
            self._process_inventory_lines(inv_lines)
595
649
        else:
596
650
            # eat the iterator to cause it to execute.
597
651
            list(inv_lines)
598
 
            text_filter = None
 
652
            self._text_filter = None
599
653
        if 'pack' in debug.debug_flags:
600
654
            mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
601
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
602
 
                new_pack.inventory_index.key_count(),
 
655
                time.ctime(), self._pack_collection._upload_transport.base,
 
656
                self.new_pack.random_name,
 
657
                self.new_pack.inventory_index.key_count(),
603
658
                time.time() - new_pack.start_time)
 
659
 
 
660
    def _copy_text_texts(self):
604
661
        # select text keys
605
 
        text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
606
 
            self.packs, 'text_index')[0]
607
 
        text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
608
 
        if text_filter is not None:
 
662
        text_index_map, text_nodes = self._get_text_nodes()
 
663
        if self._text_filter is not None:
609
664
            # We could return the keys copied as part of the return value from
610
665
            # _copy_nodes_graph but this doesn't work all that well with the
611
666
            # need to get line output too, so we check separately, and as we're
614
669
            # mising records.
615
670
            text_nodes = set(text_nodes)
616
671
            present_text_keys = set(_node[1] for _node in text_nodes)
617
 
            missing_text_keys = set(text_filter) - present_text_keys
 
672
            missing_text_keys = set(self._text_filter) - present_text_keys
618
673
            if missing_text_keys:
619
674
                # TODO: raise a specific error that can handle many missing
620
675
                # keys.
623
678
                    a_missing_key[0])
624
679
        # copy text keys and adjust values
625
680
        self.pb.update("Copying content texts", 3)
626
 
        list(self._copy_nodes_graph(text_nodes, text_index_map,
627
 
            new_pack._writer, new_pack.text_index))
 
681
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
 
682
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
683
            self.new_pack.text_index, readv_group_iter, total_items))
 
684
        self._log_copied_texts()
 
685
 
 
686
    def _check_references(self):
 
687
        """Make sure our external refereneces are present."""
 
688
        external_refs = self.new_pack._external_compression_parents_of_texts()
 
689
        if external_refs:
 
690
            index = self._pack_collection.text_index.combined_index
 
691
            found_items = list(index.iter_entries(external_refs))
 
692
            if len(found_items) != len(external_refs):
 
693
                found_keys = set(k for idx, k, refs, value in found_items)
 
694
                missing_items = external_refs - found_keys
 
695
                missing_file_id, missing_revision_id = missing_items.pop()
 
696
                raise errors.RevisionNotPresent(missing_revision_id,
 
697
                                                missing_file_id)
 
698
 
 
699
    def _create_pack_from_packs(self):
 
700
        self.pb.update("Opening pack", 0, 5)
 
701
        self.new_pack = self.open_pack()
 
702
        new_pack = self.new_pack
 
703
        # buffer data - we won't be reading-back during the pack creation and
 
704
        # this makes a significant difference on sftp pushes.
 
705
        new_pack.set_write_cache_size(1024*1024)
628
706
        if 'pack' in debug.debug_flags:
629
 
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
 
707
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
 
708
                for a_pack in self.packs]
 
709
            if self.revision_ids is not None:
 
710
                rev_count = len(self.revision_ids)
 
711
            else:
 
712
                rev_count = 'all'
 
713
            mutter('%s: create_pack: creating pack from source packs: '
 
714
                '%s%s %s revisions wanted %s t=0',
630
715
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
631
 
                new_pack.text_index.key_count(),
632
 
                time.time() - new_pack.start_time)
 
716
                plain_pack_list, rev_count)
 
717
        self._copy_revision_texts()
 
718
        self._copy_inventory_texts()
 
719
        self._copy_text_texts()
633
720
        # select signature keys
634
 
        signature_filter = revision_keys # same keyspace
 
721
        signature_filter = self._revision_keys # same keyspace
635
722
        signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
636
723
            self.packs, 'signature_index')[0]
637
724
        signature_nodes = self._pack_collection._index_contents(signature_index_map,
645
732
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
646
733
                new_pack.signature_index.key_count(),
647
734
                time.time() - new_pack.start_time)
648
 
        if not new_pack.data_inserted():
 
735
        self._check_references()
 
736
        if not self._use_pack(new_pack):
649
737
            new_pack.abort()
650
738
            return None
651
739
        self.pb.update("Finishing pack", 5)
703
791
                pb.update("Copied record", record_index)
704
792
                record_index += 1
705
793
 
706
 
    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
707
 
        output_lines=False):
 
794
    def _copy_nodes_graph(self, index_map, writer, write_index,
 
795
        readv_group_iter, total_items, output_lines=False):
708
796
        """Copy knit nodes between packs.
709
797
 
710
798
        :param output_lines: Return lines present in the copied data as
711
 
            an iterator.
 
799
            an iterator of line,version_id.
712
800
        """
713
801
        pb = ui.ui_factory.nested_progress_bar()
714
802
        try:
715
 
            return self._do_copy_nodes_graph(nodes, index_map, writer,
716
 
                write_index, output_lines, pb)
717
 
        finally:
 
803
            for result in self._do_copy_nodes_graph(index_map, writer,
 
804
                write_index, output_lines, pb, readv_group_iter, total_items):
 
805
                yield result
 
806
        except Exception:
 
807
            # Python 2.4 does not permit try:finally: in a generator.
 
808
            pb.finished()
 
809
            raise
 
810
        else:
718
811
            pb.finished()
719
812
 
720
 
    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
721
 
        output_lines, pb):
 
813
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
 
814
        output_lines, pb, readv_group_iter, total_items):
722
815
        # for record verification
723
816
        knit_data = _KnitData(None)
724
817
        # for line extraction when requested (inventories only)
725
818
        if output_lines:
726
819
            factory = knit.KnitPlainFactory()
727
 
        # plan a readv on each source pack:
728
 
        # group by pack
729
 
        nodes = sorted(nodes)
730
 
        # how to map this into knit.py - or knit.py into this?
731
 
        # we don't want the typical knit logic, we want grouping by pack
732
 
        # at this point - perhaps a helper library for the following code 
733
 
        # duplication points?
734
 
        request_groups = {}
735
820
        record_index = 0
736
 
        pb.update("Copied record", record_index, len(nodes))
737
 
        for index, key, value, references in nodes:
738
 
            if index not in request_groups:
739
 
                request_groups[index] = []
740
 
            request_groups[index].append((key, value, references))
741
 
        for index, items in request_groups.iteritems():
742
 
            pack_readv_requests = []
743
 
            for key, value, references in items:
744
 
                # ---- KnitGraphIndex.get_position
745
 
                bits = value[1:].split(' ')
746
 
                offset, length = int(bits[0]), int(bits[1])
747
 
                pack_readv_requests.append((offset, length, (key, value[0], references)))
748
 
            # linear scan up the pack
749
 
            pack_readv_requests.sort()
 
821
        pb.update("Copied record", record_index, total_items)
 
822
        for index, readv_vector, node_vector in readv_group_iter:
750
823
            # copy the data
751
824
            transport, path = index_map[index]
752
 
            reader = pack.make_readv_reader(transport, path,
753
 
                [offset[0:2] for offset in pack_readv_requests])
754
 
            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
755
 
                izip(reader.iter_records(), pack_readv_requests):
 
825
            reader = pack.make_readv_reader(transport, path, readv_vector)
 
826
            for (names, read_func), (key, eol_flag, references) in \
 
827
                izip(reader.iter_records(), node_vector):
756
828
                raw_data = read_func(None)
 
829
                version_id = key[-1]
757
830
                if output_lines:
758
831
                    # read the entire thing
759
 
                    content, _ = knit_data._parse_record(key[-1], raw_data)
 
832
                    content, _ = knit_data._parse_record(version_id, raw_data)
760
833
                    if len(references[-1]) == 0:
761
834
                        line_iterator = factory.get_fulltext_content(content)
762
835
                    else:
763
836
                        line_iterator = factory.get_linedelta_content(content)
764
837
                    for line in line_iterator:
765
 
                        yield line
 
838
                        yield line, version_id
766
839
                else:
767
840
                    # check the header only
768
 
                    df, _ = knit_data._parse_record_header(key[-1], raw_data)
 
841
                    df, _ = knit_data._parse_record_header(version_id, raw_data)
769
842
                    df.close()
770
843
                pos, size = writer.add_bytes_record(raw_data, names)
771
844
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
772
845
                pb.update("Copied record", record_index)
773
846
                record_index += 1
774
847
 
 
848
    def _get_text_nodes(self):
 
849
        text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
 
850
            self.packs, 'text_index')[0]
 
851
        return text_index_map, self._pack_collection._index_contents(text_index_map,
 
852
            self._text_filter)
 
853
 
 
854
    def _least_readv_node_readv(self, nodes):
 
855
        """Generate request groups for nodes using the least readv's.
 
856
        
 
857
        :param nodes: An iterable of graph index nodes.
 
858
        :return: Total node count and an iterator of the data needed to perform
 
859
            readvs to obtain the data for nodes. Each item yielded by the
 
860
            iterator is a tuple with:
 
861
            index, readv_vector, node_vector. readv_vector is a list ready to
 
862
            hand to the transport readv method, and node_vector is a list of
 
863
            (key, eol_flag, references) for the the node retrieved by the
 
864
            matching readv_vector.
 
865
        """
 
866
        # group by pack so we do one readv per pack
 
867
        nodes = sorted(nodes)
 
868
        total = len(nodes)
 
869
        request_groups = {}
 
870
        for index, key, value, references in nodes:
 
871
            if index not in request_groups:
 
872
                request_groups[index] = []
 
873
            request_groups[index].append((key, value, references))
 
874
        result = []
 
875
        for index, items in request_groups.iteritems():
 
876
            pack_readv_requests = []
 
877
            for key, value, references in items:
 
878
                # ---- KnitGraphIndex.get_position
 
879
                bits = value[1:].split(' ')
 
880
                offset, length = int(bits[0]), int(bits[1])
 
881
                pack_readv_requests.append(
 
882
                    ((offset, length), (key, value[0], references)))
 
883
            # linear scan up the pack to maximum range combining.
 
884
            pack_readv_requests.sort()
 
885
            # split out the readv and the node data.
 
886
            pack_readv = [readv for readv, node in pack_readv_requests]
 
887
            node_vector = [node for readv, node in pack_readv_requests]
 
888
            result.append((index, pack_readv, node_vector))
 
889
        return total, result
 
890
 
 
891
    def _log_copied_texts(self):
 
892
        if 'pack' in debug.debug_flags:
 
893
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
 
894
                time.ctime(), self._pack_collection._upload_transport.base,
 
895
                self.new_pack.random_name,
 
896
                self.new_pack.text_index.key_count(),
 
897
                time.time() - self.new_pack.start_time)
 
898
 
 
899
    def _process_inventory_lines(self, inv_lines):
 
900
        """Use up the inv_lines generator and setup a text key filter."""
 
901
        repo = self._pack_collection.repo
 
902
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
 
903
            inv_lines, self.revision_ids)
 
904
        text_filter = []
 
905
        for fileid, file_revids in fileid_revisions.iteritems():
 
906
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
 
907
        self._text_filter = text_filter
 
908
 
 
909
    def _revision_node_readv(self, revision_nodes):
 
910
        """Return the total revisions and the readv's to issue.
 
911
 
 
912
        :param revision_nodes: The revision index contents for the packs being
 
913
            incorporated into the new pack.
 
914
        :return: As per _least_readv_node_readv.
 
915
        """
 
916
        return self._least_readv_node_readv(revision_nodes)
 
917
 
 
918
    def _use_pack(self, new_pack):
 
919
        """Return True if new_pack should be used.
 
920
 
 
921
        :param new_pack: The pack that has just been created.
 
922
        :return: True if the pack should be used.
 
923
        """
 
924
        return new_pack.data_inserted()
 
925
 
 
926
 
 
927
class OptimisingPacker(Packer):
 
928
    """A packer which spends more time to create better disk layouts."""
 
929
 
 
930
    def _revision_node_readv(self, revision_nodes):
 
931
        """Return the total revisions and the readv's to issue.
 
932
 
 
933
        This sort places revisions in topological order with the ancestors
 
934
        after the children.
 
935
 
 
936
        :param revision_nodes: The revision index contents for the packs being
 
937
            incorporated into the new pack.
 
938
        :return: As per _least_readv_node_readv.
 
939
        """
 
940
        # build an ancestors dict
 
941
        ancestors = {}
 
942
        by_key = {}
 
943
        for index, key, value, references in revision_nodes:
 
944
            ancestors[key] = references[0]
 
945
            by_key[key] = (index, value, references)
 
946
        order = tsort.topo_sort(ancestors)
 
947
        total = len(order)
 
948
        # Single IO is pathological, but it will work as a starting point.
 
949
        requests = []
 
950
        for key in reversed(order):
 
951
            index, value, references = by_key[key]
 
952
            # ---- KnitGraphIndex.get_position
 
953
            bits = value[1:].split(' ')
 
954
            offset, length = int(bits[0]), int(bits[1])
 
955
            requests.append(
 
956
                (index, [(offset, length)], [(key, value[0], references)]))
 
957
        # TODO: combine requests in the same index that are in ascending order.
 
958
        return total, requests
 
959
 
775
960
 
776
961
class ReconcilePacker(Packer):
777
962
    """A packer which regenerates indices etc as it copies.
780
965
    regenerated.
781
966
    """
782
967
 
 
968
    def _extra_init(self):
 
969
        self._data_changed = False
 
970
 
 
971
    def _process_inventory_lines(self, inv_lines):
 
972
        """Generate a text key reference map rather for reconciling with."""
 
973
        repo = self._pack_collection.repo
 
974
        refs = repo._find_text_key_references_from_xml_inventory_lines(
 
975
            inv_lines)
 
976
        self._text_refs = refs
 
977
        # during reconcile we:
 
978
        #  - convert unreferenced texts to full texts
 
979
        #  - correct texts which reference a text not copied to be full texts
 
980
        #  - copy all others as-is but with corrected parents.
 
981
        #  - so at this point we don't know enough to decide what becomes a full
 
982
        #    text.
 
983
        self._text_filter = None
 
984
 
 
985
    def _copy_text_texts(self):
 
986
        """generate what texts we should have and then copy."""
 
987
        self.pb.update("Copying content texts", 3)
 
988
        # we have three major tasks here:
 
989
        # 1) generate the ideal index
 
990
        repo = self._pack_collection.repo
 
991
        ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
 
992
            _1, key, _2, refs in 
 
993
            self.new_pack.revision_index.iter_all_entries()])
 
994
        ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
 
995
        # 2) generate a text_nodes list that contains all the deltas that can
 
996
        #    be used as-is, with corrected parents.
 
997
        ok_nodes = []
 
998
        bad_texts = []
 
999
        discarded_nodes = []
 
1000
        NULL_REVISION = _mod_revision.NULL_REVISION
 
1001
        text_index_map, text_nodes = self._get_text_nodes()
 
1002
        for node in text_nodes:
 
1003
            # 0 - index
 
1004
            # 1 - key 
 
1005
            # 2 - value
 
1006
            # 3 - refs
 
1007
            try:
 
1008
                ideal_parents = tuple(ideal_index[node[1]])
 
1009
            except KeyError:
 
1010
                discarded_nodes.append(node)
 
1011
                self._data_changed = True
 
1012
            else:
 
1013
                if ideal_parents == (NULL_REVISION,):
 
1014
                    ideal_parents = ()
 
1015
                if ideal_parents == node[3][0]:
 
1016
                    # no change needed.
 
1017
                    ok_nodes.append(node)
 
1018
                elif ideal_parents[0:1] == node[3][0][0:1]:
 
1019
                    # the left most parent is the same, or there are no parents
 
1020
                    # today. Either way, we can preserve the representation as
 
1021
                    # long as we change the refs to be inserted.
 
1022
                    self._data_changed = True
 
1023
                    ok_nodes.append((node[0], node[1], node[2],
 
1024
                        (ideal_parents, node[3][1])))
 
1025
                    self._data_changed = True
 
1026
                else:
 
1027
                    # Reinsert this text completely
 
1028
                    bad_texts.append((node[1], ideal_parents))
 
1029
                    self._data_changed = True
 
1030
        # we're finished with some data.
 
1031
        del ideal_index
 
1032
        del text_nodes
 
1033
        # 3) bulk copy the ok data
 
1034
        total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
 
1035
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
1036
            self.new_pack.text_index, readv_group_iter, total_items))
 
1037
        # 4) adhoc copy all the other texts.
 
1038
        # We have to topologically insert all texts otherwise we can fail to
 
1039
        # reconcile when parts of a single delta chain are preserved intact,
 
1040
        # and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
 
1041
        # reinserted, and if d3 has incorrect parents it will also be
 
1042
        # reinserted. If we insert d3 first, d2 is present (as it was bulk
 
1043
        # copied), so we will try to delta, but d2 is not currently able to be
 
1044
        # extracted because it's basis d1 is not present. Topologically sorting
 
1045
        # addresses this. The following generates a sort for all the texts that
 
1046
        # are being inserted without having to reference the entire text key
 
1047
        # space (we only topo sort the revisions, which is smaller).
 
1048
        topo_order = tsort.topo_sort(ancestors)
 
1049
        rev_order = dict(zip(topo_order, range(len(topo_order))))
 
1050
        bad_texts.sort(key=lambda key:rev_order[key[0][1]])
 
1051
        transaction = repo.get_transaction()
 
1052
        file_id_index = GraphIndexPrefixAdapter(
 
1053
            self.new_pack.text_index,
 
1054
            ('blank', ), 1,
 
1055
            add_nodes_callback=self.new_pack.text_index.add_nodes)
 
1056
        knit_index = KnitGraphIndex(file_id_index,
 
1057
            add_callback=file_id_index.add_nodes,
 
1058
            deltas=True, parents=True)
 
1059
        output_knit = knit.KnitVersionedFile('reconcile-texts',
 
1060
            self._pack_collection.transport,
 
1061
            None,
 
1062
            index=knit_index,
 
1063
            access_method=_PackAccess(
 
1064
                {self.new_pack.text_index:self.new_pack.access_tuple()},
 
1065
                (self.new_pack._writer, self.new_pack.text_index)),
 
1066
            factory=knit.KnitPlainFactory())
 
1067
        for key, parent_keys in bad_texts:
 
1068
            # We refer to the new pack to delta data being output.
 
1069
            # A possible improvement would be to catch errors on short reads
 
1070
            # and only flush then.
 
1071
            self.new_pack.flush()
 
1072
            parents = []
 
1073
            for parent_key in parent_keys:
 
1074
                if parent_key[0] != key[0]:
 
1075
                    # Graph parents must match the fileid
 
1076
                    raise errors.BzrError('Mismatched key parent %r:%r' %
 
1077
                        (key, parent_keys))
 
1078
                parents.append(parent_key[1])
 
1079
            source_weave = repo.weave_store.get_weave(key[0], transaction)
 
1080
            text_lines = source_weave.get_lines(key[1])
 
1081
            # adapt the 'knit' to the current file_id.
 
1082
            file_id_index = GraphIndexPrefixAdapter(
 
1083
                self.new_pack.text_index,
 
1084
                (key[0], ), 1,
 
1085
                add_nodes_callback=self.new_pack.text_index.add_nodes)
 
1086
            knit_index._graph_index = file_id_index
 
1087
            knit_index._add_callback = file_id_index.add_nodes
 
1088
            output_knit.add_lines_with_ghosts(
 
1089
                key[1], parents, text_lines, random_id=True, check_content=False)
 
1090
        # 5) check that nothing inserted has a reference outside the keyspace.
 
1091
        missing_text_keys = self.new_pack._external_compression_parents_of_texts()
 
1092
        if missing_text_keys:
 
1093
            raise errors.BzrError('Reference to missing compression parents %r'
 
1094
                % (refs - keys,))
 
1095
        self._log_copied_texts()
 
1096
 
 
1097
    def _use_pack(self, new_pack):
 
1098
        """Override _use_pack to check for reconcile having changed content."""
 
1099
        # XXX: we might be better checking this at the copy time.
 
1100
        original_inventory_keys = set()
 
1101
        inv_index = self._pack_collection.inventory_index.combined_index
 
1102
        for entry in inv_index.iter_all_entries():
 
1103
            original_inventory_keys.add(entry[1])
 
1104
        new_inventory_keys = set()
 
1105
        for entry in new_pack.inventory_index.iter_all_entries():
 
1106
            new_inventory_keys.add(entry[1])
 
1107
        if new_inventory_keys != original_inventory_keys:
 
1108
            self._data_changed = True
 
1109
        return new_pack.data_inserted() and self._data_changed
 
1110
 
783
1111
 
784
1112
class RepositoryPackCollection(object):
785
1113
    """Management of packs within a repository."""
900
1228
        self._execute_pack_operations(pack_operations)
901
1229
        return True
902
1230
 
903
 
    def _execute_pack_operations(self, pack_operations):
 
1231
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
904
1232
        """Execute a series of pack operations.
905
1233
 
906
1234
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
1235
        :param _packer_class: The class of packer to use (default: Packer).
907
1236
        :return: None.
908
1237
        """
909
1238
        for revision_count, packs in pack_operations:
910
1239
            # we may have no-ops from the setup logic
911
1240
            if len(packs) == 0:
912
1241
                continue
913
 
            Packer(self, packs, '.autopack').pack()
 
1242
            _packer_class(self, packs, '.autopack').pack()
914
1243
            for pack in packs:
915
1244
                self._remove_pack_from_memory(pack)
916
1245
        # record the newly available packs and stop advertising the old
933
1262
        self.ensure_loaded()
934
1263
        total_packs = len(self._names)
935
1264
        if total_packs < 2:
 
1265
            # This is arguably wrong because we might not be optimal, but for
 
1266
            # now lets leave it in. (e.g. reconcile -> one pack. But not
 
1267
            # optimal.
936
1268
            return
937
1269
        total_revisions = self.revision_index.combined_index.key_count()
938
1270
        # XXX: the following may want to be a class, to pack with a given
944
1276
        pack_distribution = [1]
945
1277
        pack_operations = [[0, []]]
946
1278
        for pack in self.all_packs():
947
 
            revision_count = pack.get_revision_count()
948
 
            pack_operations[-1][0] += revision_count
 
1279
            pack_operations[-1][0] += pack.get_revision_count()
949
1280
            pack_operations[-1][1].append(pack)
950
 
        self._execute_pack_operations(pack_operations)
 
1281
        self._execute_pack_operations(pack_operations, OptimisingPacker)
951
1282
 
952
1283
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
953
1284
        """Plan a pack operation.
994
1325
    def ensure_loaded(self):
995
1326
        # NB: if you see an assertion error here, its probably access against
996
1327
        # an unlocked repo. Naughty.
997
 
        assert self.repo.is_locked()
 
1328
        if not self.repo.is_locked():
 
1329
            raise errors.ObjectNotLocked(self.repo)
998
1330
        if self._names is None:
999
1331
            self._names = {}
1000
1332
            self._packs_at_load = set()
1035
1367
        """
1036
1368
        self.ensure_loaded()
1037
1369
        if a_new_pack.name in self._names:
1038
 
            # a collision with the packs we know about (not the only possible
1039
 
            # collision, see NewPack.finish() for some discussion). Remove our
1040
 
            # prior reference to it.
1041
 
            self._remove_pack_from_memory(a_new_pack)
 
1370
            raise errors.BzrError(
 
1371
                'Pack %r already exists in %s' % (a_new_pack.name, self))
1042
1372
        self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
1043
1373
        self.add_pack_to_memory(a_new_pack)
1044
1374
 
1244
1574
            # changing it.
1245
1575
            for key, value in disk_nodes:
1246
1576
                builder.add_node(key, value)
1247
 
            self.transport.put_file('pack-names', builder.finish())
 
1577
            self.transport.put_file('pack-names', builder.finish(),
 
1578
                mode=self.repo.control_files._file_mode)
1248
1579
            # move the baseline forward
1249
1580
            self._packs_at_load = disk_nodes
1250
1581
            # now clear out the obsolete packs directory
1287
1618
        if not self.repo.is_write_locked():
1288
1619
            raise errors.NotWriteLocked(self)
1289
1620
        self._new_pack = NewPack(self._upload_transport, self._index_transport,
1290
 
            self._pack_transport, upload_suffix='.pack')
 
1621
            self._pack_transport, upload_suffix='.pack',
 
1622
            file_mode=self.repo.control_files._file_mode)
1291
1623
        # allow writing: queue writes to a new index
1292
1624
        self.revision_index.add_writable_index(self._new_pack.revision_index,
1293
1625
            self._new_pack)
1507
1839
        self._transaction = None
1508
1840
        # for tests
1509
1841
        self._reconcile_does_inventory_gc = True
1510
 
        self._reconcile_fixes_text_parents = False
 
1842
        self._reconcile_fixes_text_parents = True
1511
1843
        self._reconcile_backsup_inventory = False
1512
1844
 
1513
1845
    def _abort_write_group(self):
1528
1860
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
1529
1861
            parents-in-revision).
1530
1862
        """
1531
 
        assert self.is_locked()
 
1863
        if not self.is_locked():
 
1864
            raise errors.ObjectNotLocked(self)
1532
1865
        pb = ui.ui_factory.nested_progress_bar()
1533
1866
        result = []
1534
1867
        try:
1803
2136
    _serializer = xml5.serializer_v5
1804
2137
 
1805
2138
    def _get_matching_bzrdir(self):
1806
 
        return bzrdir.format_registry.make_bzrdir('knitpack-experimental')
 
2139
        return bzrdir.format_registry.make_bzrdir('pack-0.92')
1807
2140
 
1808
2141
    def _ignore_setting_bzrdir(self, format):
1809
2142
        pass
1840
2173
 
1841
2174
    def _get_matching_bzrdir(self):
1842
2175
        return bzrdir.format_registry.make_bzrdir(
1843
 
            'knitpack-subtree-experimental')
 
2176
            'pack-0.92-subtree')
1844
2177
 
1845
2178
    def _ignore_setting_bzrdir(self, format):
1846
2179
        pass
1862
2195
    def get_format_description(self):
1863
2196
        """See RepositoryFormat.get_format_description()."""
1864
2197
        return "Packs containing knits with subtree support\n"
 
2198
 
 
2199
 
 
2200
class RepositoryFormatKnitPack4(RepositoryFormatPack):
 
2201
    """A rich-root, no subtrees parameterised Pack repository.
 
2202
 
 
2203
    This repository format uses the xml6 serializer to get:
 
2204
     - support for recording full info about the tree root
 
2205
 
 
2206
    This format was introduced in 1.0.
 
2207
    """
 
2208
 
 
2209
    repository_class = KnitPackRepository
 
2210
    _commit_builder_class = PackRootCommitBuilder
 
2211
    rich_root_data = True
 
2212
    supports_tree_reference = False
 
2213
    _serializer = xml6.serializer_v6
 
2214
 
 
2215
    def _get_matching_bzrdir(self):
 
2216
        return bzrdir.format_registry.make_bzrdir(
 
2217
            'rich-root-pack')
 
2218
 
 
2219
    def _ignore_setting_bzrdir(self, format):
 
2220
        pass
 
2221
 
 
2222
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
2223
 
 
2224
    def check_conversion_target(self, target_format):
 
2225
        if not target_format.rich_root_data:
 
2226
            raise errors.BadConversionTarget(
 
2227
                'Does not support rich root data.', target_format)
 
2228
 
 
2229
    def get_format_string(self):
 
2230
        """See RepositoryFormat.get_format_string()."""
 
2231
        return ("Bazaar pack repository format 1 with rich root"
 
2232
                " (needs bzr 1.0)\n")
 
2233
 
 
2234
    def get_format_description(self):
 
2235
        """See RepositoryFormat.get_format_description()."""
 
2236
        return "Packs containing knits with rich root support\n"