533
594
"""Open a pack for the pack we are creating."""
534
595
return NewPack(self._pack_collection._upload_transport,
535
596
self._pack_collection._index_transport,
536
self._pack_collection._pack_transport, upload_suffix=self.suffix)
597
self._pack_collection._pack_transport, upload_suffix=self.suffix,
598
file_mode=self._pack_collection.repo.control_files._file_mode)
538
def _create_pack_from_packs(self):
539
self.pb.update("Opening pack", 0, 5)
540
new_pack = self.open_pack()
541
# buffer data - we won't be reading-back during the pack creation and
542
# this makes a significant difference on sftp pushes.
543
new_pack.set_write_cache_size(1024*1024)
544
if 'pack' in debug.debug_flags:
545
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
546
for a_pack in self.packs]
547
if self.revision_ids is not None:
548
rev_count = len(self.revision_ids)
551
mutter('%s: create_pack: creating pack from source packs: '
552
'%s%s %s revisions wanted %s t=0',
553
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
554
plain_pack_list, rev_count)
600
def _copy_revision_texts(self):
601
"""Copy revision data to the new pack."""
555
602
# select revisions
556
603
if self.revision_ids:
557
604
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
559
606
revision_keys = None
561
607
# select revision keys
562
608
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
563
609
self.packs, 'revision_index')[0]
564
610
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
565
611
# copy revision keys and adjust values
566
612
self.pb.update("Copying revision texts", 1)
567
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
568
new_pack._writer, new_pack.revision_index))
613
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
614
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
615
self.new_pack.revision_index, readv_group_iter, total_items))
569
616
if 'pack' in debug.debug_flags:
570
617
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
571
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
572
new_pack.revision_index.key_count(),
573
time.time() - new_pack.start_time)
618
time.ctime(), self._pack_collection._upload_transport.base,
619
self.new_pack.random_name,
620
self.new_pack.revision_index.key_count(),
621
time.time() - self.new_pack.start_time)
622
self._revision_keys = revision_keys
624
def _copy_inventory_texts(self):
625
"""Copy the inventory texts to the new pack.
627
self._revision_keys is used to determine what inventories to copy.
629
Sets self._text_filter appropriately.
574
631
# select inventory keys
575
inv_keys = revision_keys # currently the same keyspace, and note that
632
inv_keys = self._revision_keys # currently the same keyspace, and note that
576
633
# querying for keys here could introduce a bug where an inventory item
577
634
# is missed, so do not change it to query separately without cross
578
635
# checking like the text key check below.
583
640
# XXX: Should be a helper function to allow different inv representation
585
642
self.pb.update("Copying inventory texts", 2)
586
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
587
new_pack._writer, new_pack.inventory_index, output_lines=True)
643
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
644
inv_lines = self._copy_nodes_graph(inventory_index_map,
645
self.new_pack._writer, self.new_pack.inventory_index,
646
readv_group_iter, total_items, output_lines=True)
588
647
if self.revision_ids:
589
fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
590
inv_lines, self.revision_ids)
592
for fileid, file_revids in fileid_revisions.iteritems():
594
[(fileid, file_revid) for file_revid in file_revids])
648
self._process_inventory_lines(inv_lines)
596
650
# eat the iterator to cause it to execute.
652
self._text_filter = None
599
653
if 'pack' in debug.debug_flags:
600
654
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
601
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
602
new_pack.inventory_index.key_count(),
655
time.ctime(), self._pack_collection._upload_transport.base,
656
self.new_pack.random_name,
657
self.new_pack.inventory_index.key_count(),
603
658
time.time() - new_pack.start_time)
660
def _copy_text_texts(self):
604
661
# select text keys
605
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
606
self.packs, 'text_index')[0]
607
text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
608
if text_filter is not None:
662
text_index_map, text_nodes = self._get_text_nodes()
663
if self._text_filter is not None:
609
664
# We could return the keys copied as part of the return value from
610
665
# _copy_nodes_graph but this doesn't work all that well with the
611
666
# need to get line output too, so we check separately, and as we're
623
678
a_missing_key[0])
624
679
# copy text keys and adjust values
625
680
self.pb.update("Copying content texts", 3)
626
list(self._copy_nodes_graph(text_nodes, text_index_map,
627
new_pack._writer, new_pack.text_index))
681
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
682
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
683
self.new_pack.text_index, readv_group_iter, total_items))
684
self._log_copied_texts()
686
def _check_references(self):
687
"""Make sure our external refereneces are present."""
688
external_refs = self.new_pack._external_compression_parents_of_texts()
690
index = self._pack_collection.text_index.combined_index
691
found_items = list(index.iter_entries(external_refs))
692
if len(found_items) != len(external_refs):
693
found_keys = set(k for idx, k, refs, value in found_items)
694
missing_items = external_refs - found_keys
695
missing_file_id, missing_revision_id = missing_items.pop()
696
raise errors.RevisionNotPresent(missing_revision_id,
699
def _create_pack_from_packs(self):
700
self.pb.update("Opening pack", 0, 5)
701
self.new_pack = self.open_pack()
702
new_pack = self.new_pack
703
# buffer data - we won't be reading-back during the pack creation and
704
# this makes a significant difference on sftp pushes.
705
new_pack.set_write_cache_size(1024*1024)
628
706
if 'pack' in debug.debug_flags:
629
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
707
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
708
for a_pack in self.packs]
709
if self.revision_ids is not None:
710
rev_count = len(self.revision_ids)
713
mutter('%s: create_pack: creating pack from source packs: '
714
'%s%s %s revisions wanted %s t=0',
630
715
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
631
new_pack.text_index.key_count(),
632
time.time() - new_pack.start_time)
716
plain_pack_list, rev_count)
717
self._copy_revision_texts()
718
self._copy_inventory_texts()
719
self._copy_text_texts()
633
720
# select signature keys
634
signature_filter = revision_keys # same keyspace
721
signature_filter = self._revision_keys # same keyspace
635
722
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
636
723
self.packs, 'signature_index')[0]
637
724
signature_nodes = self._pack_collection._index_contents(signature_index_map,
703
791
pb.update("Copied record", record_index)
704
792
record_index += 1
706
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
794
def _copy_nodes_graph(self, index_map, writer, write_index,
795
readv_group_iter, total_items, output_lines=False):
708
796
"""Copy knit nodes between packs.
710
798
:param output_lines: Return lines present in the copied data as
799
an iterator of line,version_id.
713
801
pb = ui.ui_factory.nested_progress_bar()
715
return self._do_copy_nodes_graph(nodes, index_map, writer,
716
write_index, output_lines, pb)
803
for result in self._do_copy_nodes_graph(index_map, writer,
804
write_index, output_lines, pb, readv_group_iter, total_items):
807
# Python 2.4 does not permit try:finally: in a generator.
720
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
813
def _do_copy_nodes_graph(self, index_map, writer, write_index,
814
output_lines, pb, readv_group_iter, total_items):
722
815
# for record verification
723
816
knit_data = _KnitData(None)
724
817
# for line extraction when requested (inventories only)
726
819
factory = knit.KnitPlainFactory()
727
# plan a readv on each source pack:
729
nodes = sorted(nodes)
730
# how to map this into knit.py - or knit.py into this?
731
# we don't want the typical knit logic, we want grouping by pack
732
# at this point - perhaps a helper library for the following code
733
# duplication points?
736
pb.update("Copied record", record_index, len(nodes))
737
for index, key, value, references in nodes:
738
if index not in request_groups:
739
request_groups[index] = []
740
request_groups[index].append((key, value, references))
741
for index, items in request_groups.iteritems():
742
pack_readv_requests = []
743
for key, value, references in items:
744
# ---- KnitGraphIndex.get_position
745
bits = value[1:].split(' ')
746
offset, length = int(bits[0]), int(bits[1])
747
pack_readv_requests.append((offset, length, (key, value[0], references)))
748
# linear scan up the pack
749
pack_readv_requests.sort()
821
pb.update("Copied record", record_index, total_items)
822
for index, readv_vector, node_vector in readv_group_iter:
751
824
transport, path = index_map[index]
752
reader = pack.make_readv_reader(transport, path,
753
[offset[0:2] for offset in pack_readv_requests])
754
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
755
izip(reader.iter_records(), pack_readv_requests):
825
reader = pack.make_readv_reader(transport, path, readv_vector)
826
for (names, read_func), (key, eol_flag, references) in \
827
izip(reader.iter_records(), node_vector):
756
828
raw_data = read_func(None)
758
831
# read the entire thing
759
content, _ = knit_data._parse_record(key[-1], raw_data)
832
content, _ = knit_data._parse_record(version_id, raw_data)
760
833
if len(references[-1]) == 0:
761
834
line_iterator = factory.get_fulltext_content(content)
763
836
line_iterator = factory.get_linedelta_content(content)
764
837
for line in line_iterator:
838
yield line, version_id
767
840
# check the header only
768
df, _ = knit_data._parse_record_header(key[-1], raw_data)
841
df, _ = knit_data._parse_record_header(version_id, raw_data)
770
843
pos, size = writer.add_bytes_record(raw_data, names)
771
844
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
772
845
pb.update("Copied record", record_index)
773
846
record_index += 1
848
def _get_text_nodes(self):
849
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
850
self.packs, 'text_index')[0]
851
return text_index_map, self._pack_collection._index_contents(text_index_map,
854
def _least_readv_node_readv(self, nodes):
855
"""Generate request groups for nodes using the least readv's.
857
:param nodes: An iterable of graph index nodes.
858
:return: Total node count and an iterator of the data needed to perform
859
readvs to obtain the data for nodes. Each item yielded by the
860
iterator is a tuple with:
861
index, readv_vector, node_vector. readv_vector is a list ready to
862
hand to the transport readv method, and node_vector is a list of
863
(key, eol_flag, references) for the the node retrieved by the
864
matching readv_vector.
866
# group by pack so we do one readv per pack
867
nodes = sorted(nodes)
870
for index, key, value, references in nodes:
871
if index not in request_groups:
872
request_groups[index] = []
873
request_groups[index].append((key, value, references))
875
for index, items in request_groups.iteritems():
876
pack_readv_requests = []
877
for key, value, references in items:
878
# ---- KnitGraphIndex.get_position
879
bits = value[1:].split(' ')
880
offset, length = int(bits[0]), int(bits[1])
881
pack_readv_requests.append(
882
((offset, length), (key, value[0], references)))
883
# linear scan up the pack to maximum range combining.
884
pack_readv_requests.sort()
885
# split out the readv and the node data.
886
pack_readv = [readv for readv, node in pack_readv_requests]
887
node_vector = [node for readv, node in pack_readv_requests]
888
result.append((index, pack_readv, node_vector))
891
def _log_copied_texts(self):
892
if 'pack' in debug.debug_flags:
893
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
894
time.ctime(), self._pack_collection._upload_transport.base,
895
self.new_pack.random_name,
896
self.new_pack.text_index.key_count(),
897
time.time() - self.new_pack.start_time)
899
def _process_inventory_lines(self, inv_lines):
900
"""Use up the inv_lines generator and setup a text key filter."""
901
repo = self._pack_collection.repo
902
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
903
inv_lines, self.revision_ids)
905
for fileid, file_revids in fileid_revisions.iteritems():
906
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
907
self._text_filter = text_filter
909
def _revision_node_readv(self, revision_nodes):
910
"""Return the total revisions and the readv's to issue.
912
:param revision_nodes: The revision index contents for the packs being
913
incorporated into the new pack.
914
:return: As per _least_readv_node_readv.
916
return self._least_readv_node_readv(revision_nodes)
918
def _use_pack(self, new_pack):
919
"""Return True if new_pack should be used.
921
:param new_pack: The pack that has just been created.
922
:return: True if the pack should be used.
924
return new_pack.data_inserted()
927
class OptimisingPacker(Packer):
928
"""A packer which spends more time to create better disk layouts."""
930
def _revision_node_readv(self, revision_nodes):
931
"""Return the total revisions and the readv's to issue.
933
This sort places revisions in topological order with the ancestors
936
:param revision_nodes: The revision index contents for the packs being
937
incorporated into the new pack.
938
:return: As per _least_readv_node_readv.
940
# build an ancestors dict
943
for index, key, value, references in revision_nodes:
944
ancestors[key] = references[0]
945
by_key[key] = (index, value, references)
946
order = tsort.topo_sort(ancestors)
948
# Single IO is pathological, but it will work as a starting point.
950
for key in reversed(order):
951
index, value, references = by_key[key]
952
# ---- KnitGraphIndex.get_position
953
bits = value[1:].split(' ')
954
offset, length = int(bits[0]), int(bits[1])
956
(index, [(offset, length)], [(key, value[0], references)]))
957
# TODO: combine requests in the same index that are in ascending order.
958
return total, requests
776
961
class ReconcilePacker(Packer):
777
962
"""A packer which regenerates indices etc as it copies.
968
def _extra_init(self):
969
self._data_changed = False
971
def _process_inventory_lines(self, inv_lines):
972
"""Generate a text key reference map rather for reconciling with."""
973
repo = self._pack_collection.repo
974
refs = repo._find_text_key_references_from_xml_inventory_lines(
976
self._text_refs = refs
977
# during reconcile we:
978
# - convert unreferenced texts to full texts
979
# - correct texts which reference a text not copied to be full texts
980
# - copy all others as-is but with corrected parents.
981
# - so at this point we don't know enough to decide what becomes a full
983
self._text_filter = None
985
def _copy_text_texts(self):
986
"""generate what texts we should have and then copy."""
987
self.pb.update("Copying content texts", 3)
988
# we have three major tasks here:
989
# 1) generate the ideal index
990
repo = self._pack_collection.repo
991
ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
993
self.new_pack.revision_index.iter_all_entries()])
994
ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
995
# 2) generate a text_nodes list that contains all the deltas that can
996
# be used as-is, with corrected parents.
1000
NULL_REVISION = _mod_revision.NULL_REVISION
1001
text_index_map, text_nodes = self._get_text_nodes()
1002
for node in text_nodes:
1008
ideal_parents = tuple(ideal_index[node[1]])
1010
discarded_nodes.append(node)
1011
self._data_changed = True
1013
if ideal_parents == (NULL_REVISION,):
1015
if ideal_parents == node[3][0]:
1017
ok_nodes.append(node)
1018
elif ideal_parents[0:1] == node[3][0][0:1]:
1019
# the left most parent is the same, or there are no parents
1020
# today. Either way, we can preserve the representation as
1021
# long as we change the refs to be inserted.
1022
self._data_changed = True
1023
ok_nodes.append((node[0], node[1], node[2],
1024
(ideal_parents, node[3][1])))
1025
self._data_changed = True
1027
# Reinsert this text completely
1028
bad_texts.append((node[1], ideal_parents))
1029
self._data_changed = True
1030
# we're finished with some data.
1033
# 3) bulk copy the ok data
1034
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1035
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1036
self.new_pack.text_index, readv_group_iter, total_items))
1037
# 4) adhoc copy all the other texts.
1038
# We have to topologically insert all texts otherwise we can fail to
1039
# reconcile when parts of a single delta chain are preserved intact,
1040
# and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
1041
# reinserted, and if d3 has incorrect parents it will also be
1042
# reinserted. If we insert d3 first, d2 is present (as it was bulk
1043
# copied), so we will try to delta, but d2 is not currently able to be
1044
# extracted because it's basis d1 is not present. Topologically sorting
1045
# addresses this. The following generates a sort for all the texts that
1046
# are being inserted without having to reference the entire text key
1047
# space (we only topo sort the revisions, which is smaller).
1048
topo_order = tsort.topo_sort(ancestors)
1049
rev_order = dict(zip(topo_order, range(len(topo_order))))
1050
bad_texts.sort(key=lambda key:rev_order[key[0][1]])
1051
transaction = repo.get_transaction()
1052
file_id_index = GraphIndexPrefixAdapter(
1053
self.new_pack.text_index,
1055
add_nodes_callback=self.new_pack.text_index.add_nodes)
1056
knit_index = KnitGraphIndex(file_id_index,
1057
add_callback=file_id_index.add_nodes,
1058
deltas=True, parents=True)
1059
output_knit = knit.KnitVersionedFile('reconcile-texts',
1060
self._pack_collection.transport,
1063
access_method=_PackAccess(
1064
{self.new_pack.text_index:self.new_pack.access_tuple()},
1065
(self.new_pack._writer, self.new_pack.text_index)),
1066
factory=knit.KnitPlainFactory())
1067
for key, parent_keys in bad_texts:
1068
# We refer to the new pack to delta data being output.
1069
# A possible improvement would be to catch errors on short reads
1070
# and only flush then.
1071
self.new_pack.flush()
1073
for parent_key in parent_keys:
1074
if parent_key[0] != key[0]:
1075
# Graph parents must match the fileid
1076
raise errors.BzrError('Mismatched key parent %r:%r' %
1078
parents.append(parent_key[1])
1079
source_weave = repo.weave_store.get_weave(key[0], transaction)
1080
text_lines = source_weave.get_lines(key[1])
1081
# adapt the 'knit' to the current file_id.
1082
file_id_index = GraphIndexPrefixAdapter(
1083
self.new_pack.text_index,
1085
add_nodes_callback=self.new_pack.text_index.add_nodes)
1086
knit_index._graph_index = file_id_index
1087
knit_index._add_callback = file_id_index.add_nodes
1088
output_knit.add_lines_with_ghosts(
1089
key[1], parents, text_lines, random_id=True, check_content=False)
1090
# 5) check that nothing inserted has a reference outside the keyspace.
1091
missing_text_keys = self.new_pack._external_compression_parents_of_texts()
1092
if missing_text_keys:
1093
raise errors.BzrError('Reference to missing compression parents %r'
1095
self._log_copied_texts()
1097
def _use_pack(self, new_pack):
1098
"""Override _use_pack to check for reconcile having changed content."""
1099
# XXX: we might be better checking this at the copy time.
1100
original_inventory_keys = set()
1101
inv_index = self._pack_collection.inventory_index.combined_index
1102
for entry in inv_index.iter_all_entries():
1103
original_inventory_keys.add(entry[1])
1104
new_inventory_keys = set()
1105
for entry in new_pack.inventory_index.iter_all_entries():
1106
new_inventory_keys.add(entry[1])
1107
if new_inventory_keys != original_inventory_keys:
1108
self._data_changed = True
1109
return new_pack.data_inserted() and self._data_changed
784
1112
class RepositoryPackCollection(object):
785
1113
"""Management of packs within a repository."""