/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Martin Pool
  • Date: 2007-12-14 07:35:49 UTC
  • mfrom: (3109 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3111.
  • Revision ID: mbp@sourcefrog.net-20071214073549-wpekccrsxjv77yze
Merge 1.0final back to trunk and bump to 1.1dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
610
610
        revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
611
611
        # copy revision keys and adjust values
612
612
        self.pb.update("Copying revision texts", 1)
613
 
        list(self._copy_nodes_graph(revision_nodes, revision_index_map,
614
 
            self.new_pack._writer, self.new_pack.revision_index))
 
613
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
 
614
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
 
615
            self.new_pack.revision_index, readv_group_iter, total_items))
615
616
        if 'pack' in debug.debug_flags:
616
617
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
617
618
                time.ctime(), self._pack_collection._upload_transport.base,
639
640
        # XXX: Should be a helper function to allow different inv representation
640
641
        # at this point.
641
642
        self.pb.update("Copying inventory texts", 2)
642
 
        inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
643
 
            self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
 
643
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
 
644
        inv_lines = self._copy_nodes_graph(inventory_index_map,
 
645
            self.new_pack._writer, self.new_pack.inventory_index,
 
646
            readv_group_iter, total_items, output_lines=True)
644
647
        if self.revision_ids:
645
648
            self._process_inventory_lines(inv_lines)
646
649
        else:
675
678
                    a_missing_key[0])
676
679
        # copy text keys and adjust values
677
680
        self.pb.update("Copying content texts", 3)
678
 
        list(self._copy_nodes_graph(text_nodes, text_index_map,
679
 
            self.new_pack._writer, self.new_pack.text_index))
 
681
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
 
682
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
683
            self.new_pack.text_index, readv_group_iter, total_items))
680
684
        self._log_copied_texts()
681
685
 
682
686
    def _check_references(self):
787
791
                pb.update("Copied record", record_index)
788
792
                record_index += 1
789
793
 
790
 
    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
791
 
        output_lines=False):
 
794
    def _copy_nodes_graph(self, index_map, writer, write_index,
 
795
        readv_group_iter, total_items, output_lines=False):
792
796
        """Copy knit nodes between packs.
793
797
 
794
798
        :param output_lines: Return lines present in the copied data as
796
800
        """
797
801
        pb = ui.ui_factory.nested_progress_bar()
798
802
        try:
799
 
            for result in self._do_copy_nodes_graph(nodes, index_map, writer,
800
 
                write_index, output_lines, pb):
 
803
            for result in self._do_copy_nodes_graph(index_map, writer,
 
804
                write_index, output_lines, pb, readv_group_iter, total_items):
801
805
                yield result
802
806
        except Exception:
803
807
            # Python 2.4 does not permit try:finally: in a generator.
806
810
        else:
807
811
            pb.finished()
808
812
 
809
 
    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
810
 
        output_lines, pb):
 
813
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
 
814
        output_lines, pb, readv_group_iter, total_items):
811
815
        # for record verification
812
816
        knit_data = _KnitData(None)
813
817
        # for line extraction when requested (inventories only)
814
818
        if output_lines:
815
819
            factory = knit.KnitPlainFactory()
816
 
        # plan a readv on each source pack:
817
 
        # group by pack
818
 
        nodes = sorted(nodes)
819
 
        # how to map this into knit.py - or knit.py into this?
820
 
        # we don't want the typical knit logic, we want grouping by pack
821
 
        # at this point - perhaps a helper library for the following code 
822
 
        # duplication points?
823
 
        request_groups = {}
824
820
        record_index = 0
825
 
        pb.update("Copied record", record_index, len(nodes))
826
 
        for index, key, value, references in nodes:
827
 
            if index not in request_groups:
828
 
                request_groups[index] = []
829
 
            request_groups[index].append((key, value, references))
830
 
        for index, items in request_groups.iteritems():
831
 
            pack_readv_requests = []
832
 
            for key, value, references in items:
833
 
                # ---- KnitGraphIndex.get_position
834
 
                bits = value[1:].split(' ')
835
 
                offset, length = int(bits[0]), int(bits[1])
836
 
                pack_readv_requests.append((offset, length, (key, value[0], references)))
837
 
            # linear scan up the pack
838
 
            pack_readv_requests.sort()
 
821
        pb.update("Copied record", record_index, total_items)
 
822
        for index, readv_vector, node_vector in readv_group_iter:
839
823
            # copy the data
840
824
            transport, path = index_map[index]
841
 
            reader = pack.make_readv_reader(transport, path,
842
 
                [offset[0:2] for offset in pack_readv_requests])
843
 
            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
844
 
                izip(reader.iter_records(), pack_readv_requests):
 
825
            reader = pack.make_readv_reader(transport, path, readv_vector)
 
826
            for (names, read_func), (key, eol_flag, references) in \
 
827
                izip(reader.iter_records(), node_vector):
845
828
                raw_data = read_func(None)
846
829
                version_id = key[-1]
847
830
                if output_lines:
868
851
        return text_index_map, self._pack_collection._index_contents(text_index_map,
869
852
            self._text_filter)
870
853
 
 
854
    def _least_readv_node_readv(self, nodes):
 
855
        """Generate request groups for nodes using the least readv's.
 
856
        
 
857
        :param nodes: An iterable of graph index nodes.
 
858
        :return: Total node count and an iterator of the data needed to perform
 
859
            readvs to obtain the data for nodes. Each item yielded by the
 
860
            iterator is a tuple with:
 
861
            index, readv_vector, node_vector. readv_vector is a list ready to
 
862
            hand to the transport readv method, and node_vector is a list of
 
863
            (key, eol_flag, references) for the the node retrieved by the
 
864
            matching readv_vector.
 
865
        """
 
866
        # group by pack so we do one readv per pack
 
867
        nodes = sorted(nodes)
 
868
        total = len(nodes)
 
869
        request_groups = {}
 
870
        for index, key, value, references in nodes:
 
871
            if index not in request_groups:
 
872
                request_groups[index] = []
 
873
            request_groups[index].append((key, value, references))
 
874
        result = []
 
875
        for index, items in request_groups.iteritems():
 
876
            pack_readv_requests = []
 
877
            for key, value, references in items:
 
878
                # ---- KnitGraphIndex.get_position
 
879
                bits = value[1:].split(' ')
 
880
                offset, length = int(bits[0]), int(bits[1])
 
881
                pack_readv_requests.append(
 
882
                    ((offset, length), (key, value[0], references)))
 
883
            # linear scan up the pack to maximum range combining.
 
884
            pack_readv_requests.sort()
 
885
            # split out the readv and the node data.
 
886
            pack_readv = [readv for readv, node in pack_readv_requests]
 
887
            node_vector = [node for readv, node in pack_readv_requests]
 
888
            result.append((index, pack_readv, node_vector))
 
889
        return total, result
 
890
 
871
891
    def _log_copied_texts(self):
872
892
        if 'pack' in debug.debug_flags:
873
893
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
886
906
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
887
907
        self._text_filter = text_filter
888
908
 
 
909
    def _revision_node_readv(self, revision_nodes):
 
910
        """Return the total revisions and the readv's to issue.
 
911
 
 
912
        :param revision_nodes: The revision index contents for the packs being
 
913
            incorporated into the new pack.
 
914
        :return: As per _least_readv_node_readv.
 
915
        """
 
916
        return self._least_readv_node_readv(revision_nodes)
 
917
 
889
918
    def _use_pack(self, new_pack):
890
919
        """Return True if new_pack should be used.
891
920
 
895
924
        return new_pack.data_inserted()
896
925
 
897
926
 
 
927
class OptimisingPacker(Packer):
 
928
    """A packer which spends more time to create better disk layouts."""
 
929
 
 
930
    def _revision_node_readv(self, revision_nodes):
 
931
        """Return the total revisions and the readv's to issue.
 
932
 
 
933
        This sort places revisions in topological order with the ancestors
 
934
        after the children.
 
935
 
 
936
        :param revision_nodes: The revision index contents for the packs being
 
937
            incorporated into the new pack.
 
938
        :return: As per _least_readv_node_readv.
 
939
        """
 
940
        # build an ancestors dict
 
941
        ancestors = {}
 
942
        by_key = {}
 
943
        for index, key, value, references in revision_nodes:
 
944
            ancestors[key] = references[0]
 
945
            by_key[key] = (index, value, references)
 
946
        order = tsort.topo_sort(ancestors)
 
947
        total = len(order)
 
948
        # Single IO is pathological, but it will work as a starting point.
 
949
        requests = []
 
950
        for key in reversed(order):
 
951
            index, value, references = by_key[key]
 
952
            # ---- KnitGraphIndex.get_position
 
953
            bits = value[1:].split(' ')
 
954
            offset, length = int(bits[0]), int(bits[1])
 
955
            requests.append(
 
956
                (index, [(offset, length)], [(key, value[0], references)]))
 
957
        # TODO: combine requests in the same index that are in ascending order.
 
958
        return total, requests
 
959
 
 
960
 
898
961
class ReconcilePacker(Packer):
899
962
    """A packer which regenerates indices etc as it copies.
900
963
    
968
1031
        del ideal_index
969
1032
        del text_nodes
970
1033
        # 3) bulk copy the ok data
971
 
        list(self._copy_nodes_graph(ok_nodes, text_index_map,
972
 
            self.new_pack._writer, self.new_pack.text_index))
 
1034
        total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
 
1035
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
1036
            self.new_pack.text_index, readv_group_iter, total_items))
973
1037
        # 4) adhoc copy all the other texts.
974
1038
        # We have to topologically insert all texts otherwise we can fail to
975
1039
        # reconcile when parts of a single delta chain are preserved intact,
1164
1228
        self._execute_pack_operations(pack_operations)
1165
1229
        return True
1166
1230
 
1167
 
    def _execute_pack_operations(self, pack_operations):
 
1231
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
1168
1232
        """Execute a series of pack operations.
1169
1233
 
1170
1234
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
1235
        :param _packer_class: The class of packer to use (default: Packer).
1171
1236
        :return: None.
1172
1237
        """
1173
1238
        for revision_count, packs in pack_operations:
1174
1239
            # we may have no-ops from the setup logic
1175
1240
            if len(packs) == 0:
1176
1241
                continue
1177
 
            Packer(self, packs, '.autopack').pack()
 
1242
            _packer_class(self, packs, '.autopack').pack()
1178
1243
            for pack in packs:
1179
1244
                self._remove_pack_from_memory(pack)
1180
1245
        # record the newly available packs and stop advertising the old
1197
1262
        self.ensure_loaded()
1198
1263
        total_packs = len(self._names)
1199
1264
        if total_packs < 2:
 
1265
            # This is arguably wrong because we might not be optimal, but for
 
1266
            # now lets leave it in. (e.g. reconcile -> one pack. But not
 
1267
            # optimal.
1200
1268
            return
1201
1269
        total_revisions = self.revision_index.combined_index.key_count()
1202
1270
        # XXX: the following may want to be a class, to pack with a given
1208
1276
        pack_distribution = [1]
1209
1277
        pack_operations = [[0, []]]
1210
1278
        for pack in self.all_packs():
1211
 
            revision_count = pack.get_revision_count()
1212
 
            pack_operations[-1][0] += revision_count
 
1279
            pack_operations[-1][0] += pack.get_revision_count()
1213
1280
            pack_operations[-1][1].append(pack)
1214
 
        self._execute_pack_operations(pack_operations)
 
1281
        self._execute_pack_operations(pack_operations, OptimisingPacker)
1215
1282
 
1216
1283
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
1217
1284
        """Plan a pack operation.