610
610
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
611
611
# copy revision keys and adjust values
612
612
self.pb.update("Copying revision texts", 1)
613
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
614
self.new_pack._writer, self.new_pack.revision_index))
613
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
614
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
615
self.new_pack.revision_index, readv_group_iter, total_items))
615
616
if 'pack' in debug.debug_flags:
616
617
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
617
618
time.ctime(), self._pack_collection._upload_transport.base,
639
640
# XXX: Should be a helper function to allow different inv representation
641
642
self.pb.update("Copying inventory texts", 2)
642
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
643
self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
643
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
644
inv_lines = self._copy_nodes_graph(inventory_index_map,
645
self.new_pack._writer, self.new_pack.inventory_index,
646
readv_group_iter, total_items, output_lines=True)
644
647
if self.revision_ids:
645
648
self._process_inventory_lines(inv_lines)
675
678
a_missing_key[0])
676
679
# copy text keys and adjust values
677
680
self.pb.update("Copying content texts", 3)
678
list(self._copy_nodes_graph(text_nodes, text_index_map,
679
self.new_pack._writer, self.new_pack.text_index))
681
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
682
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
683
self.new_pack.text_index, readv_group_iter, total_items))
680
684
self._log_copied_texts()
682
686
def _check_references(self):
787
791
pb.update("Copied record", record_index)
788
792
record_index += 1
790
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
794
def _copy_nodes_graph(self, index_map, writer, write_index,
795
readv_group_iter, total_items, output_lines=False):
792
796
"""Copy knit nodes between packs.
794
798
:param output_lines: Return lines present in the copied data as
797
801
pb = ui.ui_factory.nested_progress_bar()
799
for result in self._do_copy_nodes_graph(nodes, index_map, writer,
800
write_index, output_lines, pb):
803
for result in self._do_copy_nodes_graph(index_map, writer,
804
write_index, output_lines, pb, readv_group_iter, total_items):
802
806
except Exception:
803
807
# Python 2.4 does not permit try:finally: in a generator.
809
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
813
def _do_copy_nodes_graph(self, index_map, writer, write_index,
814
output_lines, pb, readv_group_iter, total_items):
811
815
# for record verification
812
816
knit_data = _KnitData(None)
813
817
# for line extraction when requested (inventories only)
815
819
factory = knit.KnitPlainFactory()
816
# plan a readv on each source pack:
818
nodes = sorted(nodes)
819
# how to map this into knit.py - or knit.py into this?
820
# we don't want the typical knit logic, we want grouping by pack
821
# at this point - perhaps a helper library for the following code
822
# duplication points?
825
pb.update("Copied record", record_index, len(nodes))
826
for index, key, value, references in nodes:
827
if index not in request_groups:
828
request_groups[index] = []
829
request_groups[index].append((key, value, references))
830
for index, items in request_groups.iteritems():
831
pack_readv_requests = []
832
for key, value, references in items:
833
# ---- KnitGraphIndex.get_position
834
bits = value[1:].split(' ')
835
offset, length = int(bits[0]), int(bits[1])
836
pack_readv_requests.append((offset, length, (key, value[0], references)))
837
# linear scan up the pack
838
pack_readv_requests.sort()
821
pb.update("Copied record", record_index, total_items)
822
for index, readv_vector, node_vector in readv_group_iter:
840
824
transport, path = index_map[index]
841
reader = pack.make_readv_reader(transport, path,
842
[offset[0:2] for offset in pack_readv_requests])
843
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
844
izip(reader.iter_records(), pack_readv_requests):
825
reader = pack.make_readv_reader(transport, path, readv_vector)
826
for (names, read_func), (key, eol_flag, references) in \
827
izip(reader.iter_records(), node_vector):
845
828
raw_data = read_func(None)
846
829
version_id = key[-1]
868
851
return text_index_map, self._pack_collection._index_contents(text_index_map,
869
852
self._text_filter)
854
def _least_readv_node_readv(self, nodes):
855
"""Generate request groups for nodes using the least readv's.
857
:param nodes: An iterable of graph index nodes.
858
:return: Total node count and an iterator of the data needed to perform
859
readvs to obtain the data for nodes. Each item yielded by the
860
iterator is a tuple with:
861
index, readv_vector, node_vector. readv_vector is a list ready to
862
hand to the transport readv method, and node_vector is a list of
863
(key, eol_flag, references) for the the node retrieved by the
864
matching readv_vector.
866
# group by pack so we do one readv per pack
867
nodes = sorted(nodes)
870
for index, key, value, references in nodes:
871
if index not in request_groups:
872
request_groups[index] = []
873
request_groups[index].append((key, value, references))
875
for index, items in request_groups.iteritems():
876
pack_readv_requests = []
877
for key, value, references in items:
878
# ---- KnitGraphIndex.get_position
879
bits = value[1:].split(' ')
880
offset, length = int(bits[0]), int(bits[1])
881
pack_readv_requests.append(
882
((offset, length), (key, value[0], references)))
883
# linear scan up the pack to maximum range combining.
884
pack_readv_requests.sort()
885
# split out the readv and the node data.
886
pack_readv = [readv for readv, node in pack_readv_requests]
887
node_vector = [node for readv, node in pack_readv_requests]
888
result.append((index, pack_readv, node_vector))
871
891
def _log_copied_texts(self):
872
892
if 'pack' in debug.debug_flags:
873
893
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
886
906
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
887
907
self._text_filter = text_filter
909
def _revision_node_readv(self, revision_nodes):
910
"""Return the total revisions and the readv's to issue.
912
:param revision_nodes: The revision index contents for the packs being
913
incorporated into the new pack.
914
:return: As per _least_readv_node_readv.
916
return self._least_readv_node_readv(revision_nodes)
889
918
def _use_pack(self, new_pack):
890
919
"""Return True if new_pack should be used.
895
924
return new_pack.data_inserted()
927
class OptimisingPacker(Packer):
928
"""A packer which spends more time to create better disk layouts."""
930
def _revision_node_readv(self, revision_nodes):
931
"""Return the total revisions and the readv's to issue.
933
This sort places revisions in topological order with the ancestors
936
:param revision_nodes: The revision index contents for the packs being
937
incorporated into the new pack.
938
:return: As per _least_readv_node_readv.
940
# build an ancestors dict
943
for index, key, value, references in revision_nodes:
944
ancestors[key] = references[0]
945
by_key[key] = (index, value, references)
946
order = tsort.topo_sort(ancestors)
948
# Single IO is pathological, but it will work as a starting point.
950
for key in reversed(order):
951
index, value, references = by_key[key]
952
# ---- KnitGraphIndex.get_position
953
bits = value[1:].split(' ')
954
offset, length = int(bits[0]), int(bits[1])
956
(index, [(offset, length)], [(key, value[0], references)]))
957
# TODO: combine requests in the same index that are in ascending order.
958
return total, requests
898
961
class ReconcilePacker(Packer):
899
962
"""A packer which regenerates indices etc as it copies.
970
1033
# 3) bulk copy the ok data
971
list(self._copy_nodes_graph(ok_nodes, text_index_map,
972
self.new_pack._writer, self.new_pack.text_index))
1034
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1035
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1036
self.new_pack.text_index, readv_group_iter, total_items))
973
1037
# 4) adhoc copy all the other texts.
974
1038
# We have to topologically insert all texts otherwise we can fail to
975
1039
# reconcile when parts of a single delta chain are preserved intact,
1164
1228
self._execute_pack_operations(pack_operations)
1167
def _execute_pack_operations(self, pack_operations):
1231
def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
1168
1232
"""Execute a series of pack operations.
1170
1234
:param pack_operations: A list of [revision_count, packs_to_combine].
1235
:param _packer_class: The class of packer to use (default: Packer).
1173
1238
for revision_count, packs in pack_operations:
1174
1239
# we may have no-ops from the setup logic
1175
1240
if len(packs) == 0:
1177
Packer(self, packs, '.autopack').pack()
1242
_packer_class(self, packs, '.autopack').pack()
1178
1243
for pack in packs:
1179
1244
self._remove_pack_from_memory(pack)
1180
1245
# record the newly available packs and stop advertising the old
1208
1276
pack_distribution = [1]
1209
1277
pack_operations = [[0, []]]
1210
1278
for pack in self.all_packs():
1211
revision_count = pack.get_revision_count()
1212
pack_operations[-1][0] += revision_count
1279
pack_operations[-1][0] += pack.get_revision_count()
1213
1280
pack_operations[-1][1].append(pack)
1214
self._execute_pack_operations(pack_operations)
1281
self._execute_pack_operations(pack_operations, OptimisingPacker)
1216
1283
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1217
1284
"""Plan a pack operation.