73
76
added text, reducing memory and object pressure.
79
def __init__(self, repository, parents, config, timestamp=None,
80
timezone=None, committer=None, revprops=None,
82
CommitBuilder.__init__(self, repository, parents, config,
83
timestamp=timestamp, timezone=timezone, committer=committer,
84
revprops=revprops, revision_id=revision_id)
85
self._file_graph = graph.Graph(
86
repository._pack_collection.text_index.combined_index)
76
88
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
77
89
return self.repository._pack_collection._add_text_to_weave(file_id,
78
90
self._new_revision_id, new_lines, parents, nostore_sha,
93
def _heads(self, file_id, revision_ids):
94
keys = [(file_id, revision_id) for revision_id in revision_ids]
95
return set([key[1] for key in self._file_graph.heads(keys)])
82
98
class PackRootCommitBuilder(RootCommitBuilder):
83
99
"""A subclass of RootCommitBuilder to add texts with pack semantics.
86
102
added text, reducing memory and object pressure.
105
def __init__(self, repository, parents, config, timestamp=None,
106
timezone=None, committer=None, revprops=None,
108
CommitBuilder.__init__(self, repository, parents, config,
109
timestamp=timestamp, timezone=timezone, committer=committer,
110
revprops=revprops, revision_id=revision_id)
111
self._file_graph = graph.Graph(
112
repository._pack_collection.text_index.combined_index)
89
114
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
90
115
return self.repository._pack_collection._add_text_to_weave(file_id,
91
116
self._new_revision_id, new_lines, parents, nostore_sha,
92
117
self.random_revid)
119
def _heads(self, file_id, revision_ids):
120
keys = [(file_id, revision_id) for revision_id in revision_ids]
121
return set([key[1] for key in self._file_graph.heads(keys)])
95
124
class Pack(object):
96
125
"""An in memory proxy for a pack and its indices.
541
598
self._pack_collection._pack_transport, upload_suffix=self.suffix,
542
599
file_mode=self._pack_collection.repo.control_files._file_mode)
544
def _create_pack_from_packs(self):
545
self.pb.update("Opening pack", 0, 5)
546
new_pack = self.open_pack()
547
# buffer data - we won't be reading-back during the pack creation and
548
# this makes a significant difference on sftp pushes.
549
new_pack.set_write_cache_size(1024*1024)
550
if 'pack' in debug.debug_flags:
551
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
552
for a_pack in self.packs]
553
if self.revision_ids is not None:
554
rev_count = len(self.revision_ids)
557
mutter('%s: create_pack: creating pack from source packs: '
558
'%s%s %s revisions wanted %s t=0',
559
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
560
plain_pack_list, rev_count)
601
def _copy_revision_texts(self):
602
"""Copy revision data to the new pack."""
561
603
# select revisions
562
604
if self.revision_ids:
563
605
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
565
607
revision_keys = None
567
608
# select revision keys
568
609
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
569
610
self.packs, 'revision_index')[0]
570
611
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
571
612
# copy revision keys and adjust values
572
613
self.pb.update("Copying revision texts", 1)
573
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
574
new_pack._writer, new_pack.revision_index))
614
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
615
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
616
self.new_pack.revision_index, readv_group_iter, total_items))
575
617
if 'pack' in debug.debug_flags:
576
618
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
577
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
578
new_pack.revision_index.key_count(),
579
time.time() - new_pack.start_time)
619
time.ctime(), self._pack_collection._upload_transport.base,
620
self.new_pack.random_name,
621
self.new_pack.revision_index.key_count(),
622
time.time() - self.new_pack.start_time)
623
self._revision_keys = revision_keys
625
def _copy_inventory_texts(self):
626
"""Copy the inventory texts to the new pack.
628
self._revision_keys is used to determine what inventories to copy.
630
Sets self._text_filter appropriately.
580
632
# select inventory keys
581
inv_keys = revision_keys # currently the same keyspace, and note that
633
inv_keys = self._revision_keys # currently the same keyspace, and note that
582
634
# querying for keys here could introduce a bug where an inventory item
583
635
# is missed, so do not change it to query separately without cross
584
636
# checking like the text key check below.
589
641
# XXX: Should be a helper function to allow different inv representation
591
643
self.pb.update("Copying inventory texts", 2)
592
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
593
new_pack._writer, new_pack.inventory_index, output_lines=True)
644
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
645
inv_lines = self._copy_nodes_graph(inventory_index_map,
646
self.new_pack._writer, self.new_pack.inventory_index,
647
readv_group_iter, total_items, output_lines=True)
594
648
if self.revision_ids:
595
fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
596
inv_lines, self.revision_ids)
598
for fileid, file_revids in fileid_revisions.iteritems():
600
[(fileid, file_revid) for file_revid in file_revids])
649
self._process_inventory_lines(inv_lines)
602
651
# eat the iterator to cause it to execute.
653
self._text_filter = None
605
654
if 'pack' in debug.debug_flags:
606
655
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
607
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
608
new_pack.inventory_index.key_count(),
656
time.ctime(), self._pack_collection._upload_transport.base,
657
self.new_pack.random_name,
658
self.new_pack.inventory_index.key_count(),
609
659
time.time() - new_pack.start_time)
661
def _copy_text_texts(self):
610
662
# select text keys
611
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
612
self.packs, 'text_index')[0]
613
text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
614
if text_filter is not None:
663
text_index_map, text_nodes = self._get_text_nodes()
664
if self._text_filter is not None:
615
665
# We could return the keys copied as part of the return value from
616
666
# _copy_nodes_graph but this doesn't work all that well with the
617
667
# need to get line output too, so we check separately, and as we're
629
679
a_missing_key[0])
630
680
# copy text keys and adjust values
631
681
self.pb.update("Copying content texts", 3)
632
list(self._copy_nodes_graph(text_nodes, text_index_map,
633
new_pack._writer, new_pack.text_index))
682
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
683
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
684
self.new_pack.text_index, readv_group_iter, total_items))
685
self._log_copied_texts()
687
def _check_references(self):
688
"""Make sure our external refereneces are present."""
689
external_refs = self.new_pack._external_compression_parents_of_texts()
691
index = self._pack_collection.text_index.combined_index
692
found_items = list(index.iter_entries(external_refs))
693
if len(found_items) != len(external_refs):
694
found_keys = set(k for idx, k, refs, value in found_items)
695
missing_items = external_refs - found_keys
696
missing_file_id, missing_revision_id = missing_items.pop()
697
raise errors.RevisionNotPresent(missing_revision_id,
700
def _create_pack_from_packs(self):
701
self.pb.update("Opening pack", 0, 5)
702
self.new_pack = self.open_pack()
703
new_pack = self.new_pack
704
# buffer data - we won't be reading-back during the pack creation and
705
# this makes a significant difference on sftp pushes.
706
new_pack.set_write_cache_size(1024*1024)
634
707
if 'pack' in debug.debug_flags:
635
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
708
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
709
for a_pack in self.packs]
710
if self.revision_ids is not None:
711
rev_count = len(self.revision_ids)
714
mutter('%s: create_pack: creating pack from source packs: '
715
'%s%s %s revisions wanted %s t=0',
636
716
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
637
new_pack.text_index.key_count(),
638
time.time() - new_pack.start_time)
717
plain_pack_list, rev_count)
718
self._copy_revision_texts()
719
self._copy_inventory_texts()
720
self._copy_text_texts()
639
721
# select signature keys
640
signature_filter = revision_keys # same keyspace
722
signature_filter = self._revision_keys # same keyspace
641
723
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
642
724
self.packs, 'signature_index')[0]
643
725
signature_nodes = self._pack_collection._index_contents(signature_index_map,
719
802
pb = ui.ui_factory.nested_progress_bar()
721
return self._do_copy_nodes_graph(nodes, index_map, writer,
722
write_index, output_lines, pb)
804
for result in self._do_copy_nodes_graph(index_map, writer,
805
write_index, output_lines, pb, readv_group_iter, total_items):
808
# Python 2.4 does not permit try:finally: in a generator.
726
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
814
def _do_copy_nodes_graph(self, index_map, writer, write_index,
815
output_lines, pb, readv_group_iter, total_items):
728
816
# for record verification
729
817
knit_data = _KnitData(None)
730
818
# for line extraction when requested (inventories only)
732
820
factory = knit.KnitPlainFactory()
733
# plan a readv on each source pack:
735
nodes = sorted(nodes)
736
# how to map this into knit.py - or knit.py into this?
737
# we don't want the typical knit logic, we want grouping by pack
738
# at this point - perhaps a helper library for the following code
739
# duplication points?
742
pb.update("Copied record", record_index, len(nodes))
743
for index, key, value, references in nodes:
744
if index not in request_groups:
745
request_groups[index] = []
746
request_groups[index].append((key, value, references))
747
for index, items in request_groups.iteritems():
748
pack_readv_requests = []
749
for key, value, references in items:
750
# ---- KnitGraphIndex.get_position
751
bits = value[1:].split(' ')
752
offset, length = int(bits[0]), int(bits[1])
753
pack_readv_requests.append((offset, length, (key, value[0], references)))
754
# linear scan up the pack
755
pack_readv_requests.sort()
822
pb.update("Copied record", record_index, total_items)
823
for index, readv_vector, node_vector in readv_group_iter:
757
825
transport, path = index_map[index]
758
reader = pack.make_readv_reader(transport, path,
759
[offset[0:2] for offset in pack_readv_requests])
760
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
761
izip(reader.iter_records(), pack_readv_requests):
826
reader = pack.make_readv_reader(transport, path, readv_vector)
827
for (names, read_func), (key, eol_flag, references) in \
828
izip(reader.iter_records(), node_vector):
762
829
raw_data = read_func(None)
763
830
version_id = key[-1]
779
846
pb.update("Copied record", record_index)
780
847
record_index += 1
849
def _get_text_nodes(self):
850
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
851
self.packs, 'text_index')[0]
852
return text_index_map, self._pack_collection._index_contents(text_index_map,
855
def _least_readv_node_readv(self, nodes):
856
"""Generate request groups for nodes using the least readv's.
858
:param nodes: An iterable of graph index nodes.
859
:return: Total node count and an iterator of the data needed to perform
860
readvs to obtain the data for nodes. Each item yielded by the
861
iterator is a tuple with:
862
index, readv_vector, node_vector. readv_vector is a list ready to
863
hand to the transport readv method, and node_vector is a list of
864
(key, eol_flag, references) for the the node retrieved by the
865
matching readv_vector.
867
# group by pack so we do one readv per pack
868
nodes = sorted(nodes)
871
for index, key, value, references in nodes:
872
if index not in request_groups:
873
request_groups[index] = []
874
request_groups[index].append((key, value, references))
876
for index, items in request_groups.iteritems():
877
pack_readv_requests = []
878
for key, value, references in items:
879
# ---- KnitGraphIndex.get_position
880
bits = value[1:].split(' ')
881
offset, length = int(bits[0]), int(bits[1])
882
pack_readv_requests.append(
883
((offset, length), (key, value[0], references)))
884
# linear scan up the pack to maximum range combining.
885
pack_readv_requests.sort()
886
# split out the readv and the node data.
887
pack_readv = [readv for readv, node in pack_readv_requests]
888
node_vector = [node for readv, node in pack_readv_requests]
889
result.append((index, pack_readv, node_vector))
892
def _log_copied_texts(self):
893
if 'pack' in debug.debug_flags:
894
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
895
time.ctime(), self._pack_collection._upload_transport.base,
896
self.new_pack.random_name,
897
self.new_pack.text_index.key_count(),
898
time.time() - self.new_pack.start_time)
900
def _process_inventory_lines(self, inv_lines):
901
"""Use up the inv_lines generator and setup a text key filter."""
902
repo = self._pack_collection.repo
903
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
904
inv_lines, self.revision_ids)
906
for fileid, file_revids in fileid_revisions.iteritems():
907
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
908
self._text_filter = text_filter
910
def _revision_node_readv(self, revision_nodes):
911
"""Return the total revisions and the readv's to issue.
913
:param revision_nodes: The revision index contents for the packs being
914
incorporated into the new pack.
915
:return: As per _least_readv_node_readv.
917
return self._least_readv_node_readv(revision_nodes)
919
def _use_pack(self, new_pack):
920
"""Return True if new_pack should be used.
922
:param new_pack: The pack that has just been created.
923
:return: True if the pack should be used.
925
return new_pack.data_inserted()
928
class OptimisingPacker(Packer):
929
"""A packer which spends more time to create better disk layouts."""
931
def _revision_node_readv(self, revision_nodes):
932
"""Return the total revisions and the readv's to issue.
934
This sort places revisions in topological order with the ancestors
937
:param revision_nodes: The revision index contents for the packs being
938
incorporated into the new pack.
939
:return: As per _least_readv_node_readv.
941
# build an ancestors dict
944
for index, key, value, references in revision_nodes:
945
ancestors[key] = references[0]
946
by_key[key] = (index, value, references)
947
order = tsort.topo_sort(ancestors)
949
# Single IO is pathological, but it will work as a starting point.
951
for key in reversed(order):
952
index, value, references = by_key[key]
953
# ---- KnitGraphIndex.get_position
954
bits = value[1:].split(' ')
955
offset, length = int(bits[0]), int(bits[1])
957
(index, [(offset, length)], [(key, value[0], references)]))
958
# TODO: combine requests in the same index that are in ascending order.
959
return total, requests
783
962
class ReconcilePacker(Packer):
784
963
"""A packer which regenerates indices etc as it copies.
969
def _extra_init(self):
970
self._data_changed = False
972
def _process_inventory_lines(self, inv_lines):
973
"""Generate a text key reference map rather for reconciling with."""
974
repo = self._pack_collection.repo
975
refs = repo._find_text_key_references_from_xml_inventory_lines(
977
self._text_refs = refs
978
# during reconcile we:
979
# - convert unreferenced texts to full texts
980
# - correct texts which reference a text not copied to be full texts
981
# - copy all others as-is but with corrected parents.
982
# - so at this point we don't know enough to decide what becomes a full
984
self._text_filter = None
986
def _copy_text_texts(self):
987
"""generate what texts we should have and then copy."""
988
self.pb.update("Copying content texts", 3)
989
# we have three major tasks here:
990
# 1) generate the ideal index
991
repo = self._pack_collection.repo
992
ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
994
self.new_pack.revision_index.iter_all_entries()])
995
ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
996
# 2) generate a text_nodes list that contains all the deltas that can
997
# be used as-is, with corrected parents.
1000
discarded_nodes = []
1001
NULL_REVISION = _mod_revision.NULL_REVISION
1002
text_index_map, text_nodes = self._get_text_nodes()
1003
for node in text_nodes:
1009
ideal_parents = tuple(ideal_index[node[1]])
1011
discarded_nodes.append(node)
1012
self._data_changed = True
1014
if ideal_parents == (NULL_REVISION,):
1016
if ideal_parents == node[3][0]:
1018
ok_nodes.append(node)
1019
elif ideal_parents[0:1] == node[3][0][0:1]:
1020
# the left most parent is the same, or there are no parents
1021
# today. Either way, we can preserve the representation as
1022
# long as we change the refs to be inserted.
1023
self._data_changed = True
1024
ok_nodes.append((node[0], node[1], node[2],
1025
(ideal_parents, node[3][1])))
1026
self._data_changed = True
1028
# Reinsert this text completely
1029
bad_texts.append((node[1], ideal_parents))
1030
self._data_changed = True
1031
# we're finished with some data.
1034
# 3) bulk copy the ok data
1035
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1036
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1037
self.new_pack.text_index, readv_group_iter, total_items))
1038
# 4) adhoc copy all the other texts.
1039
# We have to topologically insert all texts otherwise we can fail to
1040
# reconcile when parts of a single delta chain are preserved intact,
1041
# and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
1042
# reinserted, and if d3 has incorrect parents it will also be
1043
# reinserted. If we insert d3 first, d2 is present (as it was bulk
1044
# copied), so we will try to delta, but d2 is not currently able to be
1045
# extracted because it's basis d1 is not present. Topologically sorting
1046
# addresses this. The following generates a sort for all the texts that
1047
# are being inserted without having to reference the entire text key
1048
# space (we only topo sort the revisions, which is smaller).
1049
topo_order = tsort.topo_sort(ancestors)
1050
rev_order = dict(zip(topo_order, range(len(topo_order))))
1051
bad_texts.sort(key=lambda key:rev_order[key[0][1]])
1052
transaction = repo.get_transaction()
1053
file_id_index = GraphIndexPrefixAdapter(
1054
self.new_pack.text_index,
1056
add_nodes_callback=self.new_pack.text_index.add_nodes)
1057
knit_index = KnitGraphIndex(file_id_index,
1058
add_callback=file_id_index.add_nodes,
1059
deltas=True, parents=True)
1060
output_knit = knit.KnitVersionedFile('reconcile-texts',
1061
self._pack_collection.transport,
1064
access_method=_PackAccess(
1065
{self.new_pack.text_index:self.new_pack.access_tuple()},
1066
(self.new_pack._writer, self.new_pack.text_index)),
1067
factory=knit.KnitPlainFactory())
1068
for key, parent_keys in bad_texts:
1069
# We refer to the new pack to delta data being output.
1070
# A possible improvement would be to catch errors on short reads
1071
# and only flush then.
1072
self.new_pack.flush()
1074
for parent_key in parent_keys:
1075
if parent_key[0] != key[0]:
1076
# Graph parents must match the fileid
1077
raise errors.BzrError('Mismatched key parent %r:%r' %
1079
parents.append(parent_key[1])
1080
source_weave = repo.weave_store.get_weave(key[0], transaction)
1081
text_lines = source_weave.get_lines(key[1])
1082
# adapt the 'knit' to the current file_id.
1083
file_id_index = GraphIndexPrefixAdapter(
1084
self.new_pack.text_index,
1086
add_nodes_callback=self.new_pack.text_index.add_nodes)
1087
knit_index._graph_index = file_id_index
1088
knit_index._add_callback = file_id_index.add_nodes
1089
output_knit.add_lines_with_ghosts(
1090
key[1], parents, text_lines, random_id=True, check_content=False)
1091
# 5) check that nothing inserted has a reference outside the keyspace.
1092
missing_text_keys = self.new_pack._external_compression_parents_of_texts()
1093
if missing_text_keys:
1094
raise errors.BzrError('Reference to missing compression parents %r'
1096
self._log_copied_texts()
1098
def _use_pack(self, new_pack):
1099
"""Override _use_pack to check for reconcile having changed content."""
1100
# XXX: we might be better checking this at the copy time.
1101
original_inventory_keys = set()
1102
inv_index = self._pack_collection.inventory_index.combined_index
1103
for entry in inv_index.iter_all_entries():
1104
original_inventory_keys.add(entry[1])
1105
new_inventory_keys = set()
1106
for entry in new_pack.inventory_index.iter_all_entries():
1107
new_inventory_keys.add(entry[1])
1108
if new_inventory_keys != original_inventory_keys:
1109
self._data_changed = True
1110
return new_pack.data_inserted() and self._data_changed
791
1113
class RepositoryPackCollection(object):
792
1114
"""Management of packs within a repository."""
1899
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1573
1900
def get_parents(self, revision_ids):
1574
"""See StackedParentsProvider.get_parents.
1901
"""See graph._StackedParentsProvider.get_parents."""
1902
parent_map = self.get_parent_map(revision_ids)
1903
return [parent_map.get(r, None) for r in revision_ids]
1905
def get_parent_map(self, keys):
1906
"""See graph._StackedParentsProvider.get_parent_map
1576
1908
This implementation accesses the combined revision index to provide
1579
1911
self._pack_collection.ensure_loaded()
1580
1912
index = self._pack_collection.revision_index.combined_index
1582
for revision_id in revision_ids:
1583
if revision_id != _mod_revision.NULL_REVISION:
1584
search_keys.add((revision_id,))
1585
found_parents = {_mod_revision.NULL_REVISION:[]}
1914
if _mod_revision.NULL_REVISION in keys:
1915
keys.discard(_mod_revision.NULL_REVISION)
1916
found_parents = {_mod_revision.NULL_REVISION:()}
1919
search_keys = set((revision_id,) for revision_id in keys)
1586
1920
for index, key, value, refs in index.iter_entries(search_keys):
1587
1921
parents = refs[0]
1588
1922
if not parents: