72
76
added text, reducing memory and object pressure.
79
def __init__(self, repository, parents, config, timestamp=None,
80
timezone=None, committer=None, revprops=None,
82
CommitBuilder.__init__(self, repository, parents, config,
83
timestamp=timestamp, timezone=timezone, committer=committer,
84
revprops=revprops, revision_id=revision_id)
85
self._file_graph = graph.Graph(
86
repository._pack_collection.text_index.combined_index)
75
88
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
76
89
return self.repository._pack_collection._add_text_to_weave(file_id,
77
90
self._new_revision_id, new_lines, parents, nostore_sha,
93
def _heads(self, file_id, revision_ids):
94
keys = [(file_id, revision_id) for revision_id in revision_ids]
95
return set([key[1] for key in self._file_graph.heads(keys)])
81
98
class PackRootCommitBuilder(RootCommitBuilder):
82
99
"""A subclass of RootCommitBuilder to add texts with pack semantics.
85
102
added text, reducing memory and object pressure.
105
def __init__(self, repository, parents, config, timestamp=None,
106
timezone=None, committer=None, revprops=None,
108
CommitBuilder.__init__(self, repository, parents, config,
109
timestamp=timestamp, timezone=timezone, committer=committer,
110
revprops=revprops, revision_id=revision_id)
111
self._file_graph = graph.Graph(
112
repository._pack_collection.text_index.combined_index)
88
114
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
89
115
return self.repository._pack_collection._add_text_to_weave(file_id,
90
116
self._new_revision_id, new_lines, parents, nostore_sha,
91
117
self.random_revid)
119
def _heads(self, file_id, revision_ids):
120
keys = [(file_id, revision_id) for revision_id in revision_ids]
121
return set([key[1] for key in self._file_graph.heads(keys)])
94
124
class Pack(object):
95
125
"""An in memory proxy for a pack and its indices.
533
595
"""Open a pack for the pack we are creating."""
534
596
return NewPack(self._pack_collection._upload_transport,
535
597
self._pack_collection._index_transport,
536
self._pack_collection._pack_transport, upload_suffix=self.suffix)
598
self._pack_collection._pack_transport, upload_suffix=self.suffix,
599
file_mode=self._pack_collection.repo.control_files._file_mode)
538
def _create_pack_from_packs(self):
539
self.pb.update("Opening pack", 0, 5)
540
new_pack = self.open_pack()
541
# buffer data - we won't be reading-back during the pack creation and
542
# this makes a significant difference on sftp pushes.
543
new_pack.set_write_cache_size(1024*1024)
544
if 'pack' in debug.debug_flags:
545
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
546
for a_pack in self.packs]
547
if self.revision_ids is not None:
548
rev_count = len(self.revision_ids)
551
mutter('%s: create_pack: creating pack from source packs: '
552
'%s%s %s revisions wanted %s t=0',
553
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
554
plain_pack_list, rev_count)
601
def _copy_revision_texts(self):
602
"""Copy revision data to the new pack."""
555
603
# select revisions
556
604
if self.revision_ids:
557
605
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
559
607
revision_keys = None
561
608
# select revision keys
562
609
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
563
610
self.packs, 'revision_index')[0]
564
611
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
565
612
# copy revision keys and adjust values
566
613
self.pb.update("Copying revision texts", 1)
567
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
568
new_pack._writer, new_pack.revision_index))
614
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
615
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
616
self.new_pack.revision_index, readv_group_iter, total_items))
569
617
if 'pack' in debug.debug_flags:
570
618
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
571
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
572
new_pack.revision_index.key_count(),
573
time.time() - new_pack.start_time)
619
time.ctime(), self._pack_collection._upload_transport.base,
620
self.new_pack.random_name,
621
self.new_pack.revision_index.key_count(),
622
time.time() - self.new_pack.start_time)
623
self._revision_keys = revision_keys
625
def _copy_inventory_texts(self):
626
"""Copy the inventory texts to the new pack.
628
self._revision_keys is used to determine what inventories to copy.
630
Sets self._text_filter appropriately.
574
632
# select inventory keys
575
inv_keys = revision_keys # currently the same keyspace, and note that
633
inv_keys = self._revision_keys # currently the same keyspace, and note that
576
634
# querying for keys here could introduce a bug where an inventory item
577
635
# is missed, so do not change it to query separately without cross
578
636
# checking like the text key check below.
583
641
# XXX: Should be a helper function to allow different inv representation
585
643
self.pb.update("Copying inventory texts", 2)
586
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
587
new_pack._writer, new_pack.inventory_index, output_lines=True)
644
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
645
inv_lines = self._copy_nodes_graph(inventory_index_map,
646
self.new_pack._writer, self.new_pack.inventory_index,
647
readv_group_iter, total_items, output_lines=True)
588
648
if self.revision_ids:
589
fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
590
inv_lines, self.revision_ids)
592
for fileid, file_revids in fileid_revisions.iteritems():
594
[(fileid, file_revid) for file_revid in file_revids])
649
self._process_inventory_lines(inv_lines)
596
651
# eat the iterator to cause it to execute.
653
self._text_filter = None
599
654
if 'pack' in debug.debug_flags:
600
655
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
601
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
602
new_pack.inventory_index.key_count(),
656
time.ctime(), self._pack_collection._upload_transport.base,
657
self.new_pack.random_name,
658
self.new_pack.inventory_index.key_count(),
603
659
time.time() - new_pack.start_time)
661
def _copy_text_texts(self):
604
662
# select text keys
605
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
606
self.packs, 'text_index')[0]
607
text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
608
if text_filter is not None:
663
text_index_map, text_nodes = self._get_text_nodes()
664
if self._text_filter is not None:
609
665
# We could return the keys copied as part of the return value from
610
666
# _copy_nodes_graph but this doesn't work all that well with the
611
667
# need to get line output too, so we check separately, and as we're
623
679
a_missing_key[0])
624
680
# copy text keys and adjust values
625
681
self.pb.update("Copying content texts", 3)
626
list(self._copy_nodes_graph(text_nodes, text_index_map,
627
new_pack._writer, new_pack.text_index))
682
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
683
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
684
self.new_pack.text_index, readv_group_iter, total_items))
685
self._log_copied_texts()
687
def _check_references(self):
688
"""Make sure our external refereneces are present."""
689
external_refs = self.new_pack._external_compression_parents_of_texts()
691
index = self._pack_collection.text_index.combined_index
692
found_items = list(index.iter_entries(external_refs))
693
if len(found_items) != len(external_refs):
694
found_keys = set(k for idx, k, refs, value in found_items)
695
missing_items = external_refs - found_keys
696
missing_file_id, missing_revision_id = missing_items.pop()
697
raise errors.RevisionNotPresent(missing_revision_id,
700
def _create_pack_from_packs(self):
701
self.pb.update("Opening pack", 0, 5)
702
self.new_pack = self.open_pack()
703
new_pack = self.new_pack
704
# buffer data - we won't be reading-back during the pack creation and
705
# this makes a significant difference on sftp pushes.
706
new_pack.set_write_cache_size(1024*1024)
628
707
if 'pack' in debug.debug_flags:
629
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
708
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
709
for a_pack in self.packs]
710
if self.revision_ids is not None:
711
rev_count = len(self.revision_ids)
714
mutter('%s: create_pack: creating pack from source packs: '
715
'%s%s %s revisions wanted %s t=0',
630
716
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
631
new_pack.text_index.key_count(),
632
time.time() - new_pack.start_time)
717
plain_pack_list, rev_count)
718
self._copy_revision_texts()
719
self._copy_inventory_texts()
720
self._copy_text_texts()
633
721
# select signature keys
634
signature_filter = revision_keys # same keyspace
722
signature_filter = self._revision_keys # same keyspace
635
723
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
636
724
self.packs, 'signature_index')[0]
637
725
signature_nodes = self._pack_collection._index_contents(signature_index_map,
713
802
pb = ui.ui_factory.nested_progress_bar()
715
return self._do_copy_nodes_graph(nodes, index_map, writer,
716
write_index, output_lines, pb)
804
for result in self._do_copy_nodes_graph(index_map, writer,
805
write_index, output_lines, pb, readv_group_iter, total_items):
808
# Python 2.4 does not permit try:finally: in a generator.
720
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
814
def _do_copy_nodes_graph(self, index_map, writer, write_index,
815
output_lines, pb, readv_group_iter, total_items):
722
816
# for record verification
723
817
knit_data = _KnitData(None)
724
818
# for line extraction when requested (inventories only)
726
820
factory = knit.KnitPlainFactory()
727
# plan a readv on each source pack:
729
nodes = sorted(nodes)
730
# how to map this into knit.py - or knit.py into this?
731
# we don't want the typical knit logic, we want grouping by pack
732
# at this point - perhaps a helper library for the following code
733
# duplication points?
736
pb.update("Copied record", record_index, len(nodes))
737
for index, key, value, references in nodes:
738
if index not in request_groups:
739
request_groups[index] = []
740
request_groups[index].append((key, value, references))
741
for index, items in request_groups.iteritems():
742
pack_readv_requests = []
743
for key, value, references in items:
744
# ---- KnitGraphIndex.get_position
745
bits = value[1:].split(' ')
746
offset, length = int(bits[0]), int(bits[1])
747
pack_readv_requests.append((offset, length, (key, value[0], references)))
748
# linear scan up the pack
749
pack_readv_requests.sort()
822
pb.update("Copied record", record_index, total_items)
823
for index, readv_vector, node_vector in readv_group_iter:
751
825
transport, path = index_map[index]
752
reader = pack.make_readv_reader(transport, path,
753
[offset[0:2] for offset in pack_readv_requests])
754
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
755
izip(reader.iter_records(), pack_readv_requests):
826
reader = pack.make_readv_reader(transport, path, readv_vector)
827
for (names, read_func), (key, eol_flag, references) in \
828
izip(reader.iter_records(), node_vector):
756
829
raw_data = read_func(None)
757
830
version_id = key[-1]
773
846
pb.update("Copied record", record_index)
774
847
record_index += 1
849
def _get_text_nodes(self):
850
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
851
self.packs, 'text_index')[0]
852
return text_index_map, self._pack_collection._index_contents(text_index_map,
855
def _least_readv_node_readv(self, nodes):
856
"""Generate request groups for nodes using the least readv's.
858
:param nodes: An iterable of graph index nodes.
859
:return: Total node count and an iterator of the data needed to perform
860
readvs to obtain the data for nodes. Each item yielded by the
861
iterator is a tuple with:
862
index, readv_vector, node_vector. readv_vector is a list ready to
863
hand to the transport readv method, and node_vector is a list of
864
(key, eol_flag, references) for the the node retrieved by the
865
matching readv_vector.
867
# group by pack so we do one readv per pack
868
nodes = sorted(nodes)
871
for index, key, value, references in nodes:
872
if index not in request_groups:
873
request_groups[index] = []
874
request_groups[index].append((key, value, references))
876
for index, items in request_groups.iteritems():
877
pack_readv_requests = []
878
for key, value, references in items:
879
# ---- KnitGraphIndex.get_position
880
bits = value[1:].split(' ')
881
offset, length = int(bits[0]), int(bits[1])
882
pack_readv_requests.append(
883
((offset, length), (key, value[0], references)))
884
# linear scan up the pack to maximum range combining.
885
pack_readv_requests.sort()
886
# split out the readv and the node data.
887
pack_readv = [readv for readv, node in pack_readv_requests]
888
node_vector = [node for readv, node in pack_readv_requests]
889
result.append((index, pack_readv, node_vector))
892
def _log_copied_texts(self):
893
if 'pack' in debug.debug_flags:
894
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
895
time.ctime(), self._pack_collection._upload_transport.base,
896
self.new_pack.random_name,
897
self.new_pack.text_index.key_count(),
898
time.time() - self.new_pack.start_time)
900
def _process_inventory_lines(self, inv_lines):
901
"""Use up the inv_lines generator and setup a text key filter."""
902
repo = self._pack_collection.repo
903
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
904
inv_lines, self.revision_ids)
906
for fileid, file_revids in fileid_revisions.iteritems():
907
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
908
self._text_filter = text_filter
910
def _revision_node_readv(self, revision_nodes):
911
"""Return the total revisions and the readv's to issue.
913
:param revision_nodes: The revision index contents for the packs being
914
incorporated into the new pack.
915
:return: As per _least_readv_node_readv.
917
return self._least_readv_node_readv(revision_nodes)
919
def _use_pack(self, new_pack):
920
"""Return True if new_pack should be used.
922
:param new_pack: The pack that has just been created.
923
:return: True if the pack should be used.
925
return new_pack.data_inserted()
928
class OptimisingPacker(Packer):
929
"""A packer which spends more time to create better disk layouts."""
931
def _revision_node_readv(self, revision_nodes):
932
"""Return the total revisions and the readv's to issue.
934
This sort places revisions in topological order with the ancestors
937
:param revision_nodes: The revision index contents for the packs being
938
incorporated into the new pack.
939
:return: As per _least_readv_node_readv.
941
# build an ancestors dict
944
for index, key, value, references in revision_nodes:
945
ancestors[key] = references[0]
946
by_key[key] = (index, value, references)
947
order = tsort.topo_sort(ancestors)
949
# Single IO is pathological, but it will work as a starting point.
951
for key in reversed(order):
952
index, value, references = by_key[key]
953
# ---- KnitGraphIndex.get_position
954
bits = value[1:].split(' ')
955
offset, length = int(bits[0]), int(bits[1])
957
(index, [(offset, length)], [(key, value[0], references)]))
958
# TODO: combine requests in the same index that are in ascending order.
959
return total, requests
777
962
class ReconcilePacker(Packer):
778
963
"""A packer which regenerates indices etc as it copies.
969
def _extra_init(self):
970
self._data_changed = False
972
def _process_inventory_lines(self, inv_lines):
973
"""Generate a text key reference map rather for reconciling with."""
974
repo = self._pack_collection.repo
975
refs = repo._find_text_key_references_from_xml_inventory_lines(
977
self._text_refs = refs
978
# during reconcile we:
979
# - convert unreferenced texts to full texts
980
# - correct texts which reference a text not copied to be full texts
981
# - copy all others as-is but with corrected parents.
982
# - so at this point we don't know enough to decide what becomes a full
984
self._text_filter = None
986
def _copy_text_texts(self):
987
"""generate what texts we should have and then copy."""
988
self.pb.update("Copying content texts", 3)
989
# we have three major tasks here:
990
# 1) generate the ideal index
991
repo = self._pack_collection.repo
992
ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
994
self.new_pack.revision_index.iter_all_entries()])
995
ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
996
# 2) generate a text_nodes list that contains all the deltas that can
997
# be used as-is, with corrected parents.
1000
discarded_nodes = []
1001
NULL_REVISION = _mod_revision.NULL_REVISION
1002
text_index_map, text_nodes = self._get_text_nodes()
1003
for node in text_nodes:
1009
ideal_parents = tuple(ideal_index[node[1]])
1011
discarded_nodes.append(node)
1012
self._data_changed = True
1014
if ideal_parents == (NULL_REVISION,):
1016
if ideal_parents == node[3][0]:
1018
ok_nodes.append(node)
1019
elif ideal_parents[0:1] == node[3][0][0:1]:
1020
# the left most parent is the same, or there are no parents
1021
# today. Either way, we can preserve the representation as
1022
# long as we change the refs to be inserted.
1023
self._data_changed = True
1024
ok_nodes.append((node[0], node[1], node[2],
1025
(ideal_parents, node[3][1])))
1026
self._data_changed = True
1028
# Reinsert this text completely
1029
bad_texts.append((node[1], ideal_parents))
1030
self._data_changed = True
1031
# we're finished with some data.
1034
# 3) bulk copy the ok data
1035
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1036
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1037
self.new_pack.text_index, readv_group_iter, total_items))
1038
# 4) adhoc copy all the other texts.
1039
# We have to topologically insert all texts otherwise we can fail to
1040
# reconcile when parts of a single delta chain are preserved intact,
1041
# and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
1042
# reinserted, and if d3 has incorrect parents it will also be
1043
# reinserted. If we insert d3 first, d2 is present (as it was bulk
1044
# copied), so we will try to delta, but d2 is not currently able to be
1045
# extracted because it's basis d1 is not present. Topologically sorting
1046
# addresses this. The following generates a sort for all the texts that
1047
# are being inserted without having to reference the entire text key
1048
# space (we only topo sort the revisions, which is smaller).
1049
topo_order = tsort.topo_sort(ancestors)
1050
rev_order = dict(zip(topo_order, range(len(topo_order))))
1051
bad_texts.sort(key=lambda key:rev_order[key[0][1]])
1052
transaction = repo.get_transaction()
1053
file_id_index = GraphIndexPrefixAdapter(
1054
self.new_pack.text_index,
1056
add_nodes_callback=self.new_pack.text_index.add_nodes)
1057
knit_index = KnitGraphIndex(file_id_index,
1058
add_callback=file_id_index.add_nodes,
1059
deltas=True, parents=True)
1060
output_knit = knit.KnitVersionedFile('reconcile-texts',
1061
self._pack_collection.transport,
1064
access_method=_PackAccess(
1065
{self.new_pack.text_index:self.new_pack.access_tuple()},
1066
(self.new_pack._writer, self.new_pack.text_index)),
1067
factory=knit.KnitPlainFactory())
1068
for key, parent_keys in bad_texts:
1069
# We refer to the new pack to delta data being output.
1070
# A possible improvement would be to catch errors on short reads
1071
# and only flush then.
1072
self.new_pack.flush()
1074
for parent_key in parent_keys:
1075
if parent_key[0] != key[0]:
1076
# Graph parents must match the fileid
1077
raise errors.BzrError('Mismatched key parent %r:%r' %
1079
parents.append(parent_key[1])
1080
source_weave = repo.weave_store.get_weave(key[0], transaction)
1081
text_lines = source_weave.get_lines(key[1])
1082
# adapt the 'knit' to the current file_id.
1083
file_id_index = GraphIndexPrefixAdapter(
1084
self.new_pack.text_index,
1086
add_nodes_callback=self.new_pack.text_index.add_nodes)
1087
knit_index._graph_index = file_id_index
1088
knit_index._add_callback = file_id_index.add_nodes
1089
output_knit.add_lines_with_ghosts(
1090
key[1], parents, text_lines, random_id=True, check_content=False)
1091
# 5) check that nothing inserted has a reference outside the keyspace.
1092
missing_text_keys = self.new_pack._external_compression_parents_of_texts()
1093
if missing_text_keys:
1094
raise errors.BzrError('Reference to missing compression parents %r'
1096
self._log_copied_texts()
1098
def _use_pack(self, new_pack):
1099
"""Override _use_pack to check for reconcile having changed content."""
1100
# XXX: we might be better checking this at the copy time.
1101
original_inventory_keys = set()
1102
inv_index = self._pack_collection.inventory_index.combined_index
1103
for entry in inv_index.iter_all_entries():
1104
original_inventory_keys.add(entry[1])
1105
new_inventory_keys = set()
1106
for entry in new_pack.inventory_index.iter_all_entries():
1107
new_inventory_keys.add(entry[1])
1108
if new_inventory_keys != original_inventory_keys:
1109
self._data_changed = True
1110
return new_pack.data_inserted() and self._data_changed
785
1113
class RepositoryPackCollection(object):
786
1114
"""Management of packs within a repository."""
1899
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1565
1900
def get_parents(self, revision_ids):
1566
"""See StackedParentsProvider.get_parents.
1901
"""See graph._StackedParentsProvider.get_parents."""
1902
parent_map = self.get_parent_map(revision_ids)
1903
return [parent_map.get(r, None) for r in revision_ids]
1905
def get_parent_map(self, keys):
1906
"""See graph._StackedParentsProvider.get_parent_map
1568
1908
This implementation accesses the combined revision index to provide
1571
1911
self._pack_collection.ensure_loaded()
1572
1912
index = self._pack_collection.revision_index.combined_index
1574
for revision_id in revision_ids:
1575
if revision_id != _mod_revision.NULL_REVISION:
1576
search_keys.add((revision_id,))
1577
found_parents = {_mod_revision.NULL_REVISION:[]}
1914
if _mod_revision.NULL_REVISION in keys:
1915
keys.discard(_mod_revision.NULL_REVISION)
1916
found_parents = {_mod_revision.NULL_REVISION:()}
1919
search_keys = set((revision_id,) for revision_id in keys)
1578
1920
for index, key, value, refs in index.iter_entries(search_keys):
1579
1921
parents = refs[0]
1580
1922
if not parents:
1863
2210
def get_format_description(self):
1864
2211
"""See RepositoryFormat.get_format_description()."""
1865
2212
return "Packs containing knits with subtree support\n"
2215
class RepositoryFormatKnitPack4(RepositoryFormatPack):
2216
"""A rich-root, no subtrees parameterized Pack repository.
2218
This repository format uses the xml6 serializer to get:
2219
- support for recording full info about the tree root
2221
This format was introduced in 1.0.
2224
repository_class = KnitPackRepository
2225
_commit_builder_class = PackRootCommitBuilder
2226
rich_root_data = True
2227
supports_tree_reference = False
2228
_serializer = xml6.serializer_v6
2230
def _get_matching_bzrdir(self):
2231
return bzrdir.format_registry.make_bzrdir(
2234
def _ignore_setting_bzrdir(self, format):
2237
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2239
def check_conversion_target(self, target_format):
2240
if not target_format.rich_root_data:
2241
raise errors.BadConversionTarget(
2242
'Does not support rich root data.', target_format)
2244
def get_format_string(self):
2245
"""See RepositoryFormat.get_format_string()."""
2246
return ("Bazaar pack repository format 1 with rich root"
2247
" (needs bzr 1.0)\n")
2249
def get_format_description(self):
2250
"""See RepositoryFormat.get_format_description()."""
2251
return "Packs containing knits with rich root support\n"