73
75
added text, reducing memory and object pressure.
78
def __init__(self, repository, parents, config, timestamp=None,
79
timezone=None, committer=None, revprops=None,
81
CommitBuilder.__init__(self, repository, parents, config,
82
timestamp=timestamp, timezone=timezone, committer=committer,
83
revprops=revprops, revision_id=revision_id)
84
self._file_graph = Graph(
85
repository._pack_collection.text_index.combined_index)
76
87
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
77
88
return self.repository._pack_collection._add_text_to_weave(file_id,
78
89
self._new_revision_id, new_lines, parents, nostore_sha,
92
def _heads(self, file_id, revision_ids):
93
keys = [(file_id, revision_id) for revision_id in revision_ids]
94
return set([key[1] for key in self._file_graph.heads(keys)])
82
97
class PackRootCommitBuilder(RootCommitBuilder):
83
98
"""A subclass of RootCommitBuilder to add texts with pack semantics.
86
101
added text, reducing memory and object pressure.
104
def __init__(self, repository, parents, config, timestamp=None,
105
timezone=None, committer=None, revprops=None,
107
CommitBuilder.__init__(self, repository, parents, config,
108
timestamp=timestamp, timezone=timezone, committer=committer,
109
revprops=revprops, revision_id=revision_id)
110
self._file_graph = Graph(
111
repository._pack_collection.text_index.combined_index)
89
113
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
90
114
return self.repository._pack_collection._add_text_to_weave(file_id,
91
115
self._new_revision_id, new_lines, parents, nostore_sha,
92
116
self.random_revid)
118
def _heads(self, file_id, revision_ids):
119
keys = [(file_id, revision_id) for revision_id in revision_ids]
120
return set([key[1] for key in self._file_graph.heads(keys)])
95
123
class Pack(object):
96
124
"""An in memory proxy for a pack and its indices.
541
597
self._pack_collection._pack_transport, upload_suffix=self.suffix,
542
598
file_mode=self._pack_collection.repo.control_files._file_mode)
544
def _create_pack_from_packs(self):
545
self.pb.update("Opening pack", 0, 5)
546
new_pack = self.open_pack()
547
# buffer data - we won't be reading-back during the pack creation and
548
# this makes a significant difference on sftp pushes.
549
new_pack.set_write_cache_size(1024*1024)
550
if 'pack' in debug.debug_flags:
551
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
552
for a_pack in self.packs]
553
if self.revision_ids is not None:
554
rev_count = len(self.revision_ids)
557
mutter('%s: create_pack: creating pack from source packs: '
558
'%s%s %s revisions wanted %s t=0',
559
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
560
plain_pack_list, rev_count)
600
def _copy_revision_texts(self):
601
"""Copy revision data to the new pack."""
561
602
# select revisions
562
603
if self.revision_ids:
563
604
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
565
606
revision_keys = None
567
607
# select revision keys
568
608
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
569
609
self.packs, 'revision_index')[0]
571
611
# copy revision keys and adjust values
572
612
self.pb.update("Copying revision texts", 1)
573
613
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
574
new_pack._writer, new_pack.revision_index))
614
self.new_pack._writer, self.new_pack.revision_index))
575
615
if 'pack' in debug.debug_flags:
576
616
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
577
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
578
new_pack.revision_index.key_count(),
579
time.time() - new_pack.start_time)
617
time.ctime(), self._pack_collection._upload_transport.base,
618
self.new_pack.random_name,
619
self.new_pack.revision_index.key_count(),
620
time.time() - self.new_pack.start_time)
621
self._revision_keys = revision_keys
623
def _copy_inventory_texts(self):
624
"""Copy the inventory texts to the new pack.
626
self._revision_keys is used to determine what inventories to copy.
628
Sets self._text_filter appropriately.
580
630
# select inventory keys
581
inv_keys = revision_keys # currently the same keyspace, and note that
631
inv_keys = self._revision_keys # currently the same keyspace, and note that
582
632
# querying for keys here could introduce a bug where an inventory item
583
633
# is missed, so do not change it to query separately without cross
584
634
# checking like the text key check below.
591
641
self.pb.update("Copying inventory texts", 2)
592
642
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
593
new_pack._writer, new_pack.inventory_index, output_lines=True)
643
self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
594
644
if self.revision_ids:
595
fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
596
inv_lines, self.revision_ids)
598
for fileid, file_revids in fileid_revisions.iteritems():
600
[(fileid, file_revid) for file_revid in file_revids])
645
self._process_inventory_lines(inv_lines)
602
647
# eat the iterator to cause it to execute.
649
self._text_filter = None
605
650
if 'pack' in debug.debug_flags:
606
651
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
607
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
608
new_pack.inventory_index.key_count(),
652
time.ctime(), self._pack_collection._upload_transport.base,
653
self.new_pack.random_name,
654
self.new_pack.inventory_index.key_count(),
609
655
time.time() - new_pack.start_time)
657
def _copy_text_texts(self):
610
658
# select text keys
611
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
612
self.packs, 'text_index')[0]
613
text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
614
if text_filter is not None:
659
text_index_map, text_nodes = self._get_text_nodes()
660
if self._text_filter is not None:
615
661
# We could return the keys copied as part of the return value from
616
662
# _copy_nodes_graph but this doesn't work all that well with the
617
663
# need to get line output too, so we check separately, and as we're
630
676
# copy text keys and adjust values
631
677
self.pb.update("Copying content texts", 3)
632
678
list(self._copy_nodes_graph(text_nodes, text_index_map,
633
new_pack._writer, new_pack.text_index))
679
self.new_pack._writer, self.new_pack.text_index))
680
self._log_copied_texts()
682
def _check_references(self):
683
"""Make sure our external refereneces are present."""
684
external_refs = self.new_pack._external_compression_parents_of_texts()
686
index = self._pack_collection.text_index.combined_index
687
found_items = list(index.iter_entries(external_refs))
688
if len(found_items) != len(external_refs):
689
found_keys = set(k for idx, k, refs, value in found_items)
690
missing_items = external_refs - found_keys
691
missing_file_id, missing_revision_id = missing_items.pop()
692
raise errors.RevisionNotPresent(missing_revision_id,
695
def _create_pack_from_packs(self):
696
self.pb.update("Opening pack", 0, 5)
697
self.new_pack = self.open_pack()
698
new_pack = self.new_pack
699
# buffer data - we won't be reading-back during the pack creation and
700
# this makes a significant difference on sftp pushes.
701
new_pack.set_write_cache_size(1024*1024)
634
702
if 'pack' in debug.debug_flags:
635
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
703
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
704
for a_pack in self.packs]
705
if self.revision_ids is not None:
706
rev_count = len(self.revision_ids)
709
mutter('%s: create_pack: creating pack from source packs: '
710
'%s%s %s revisions wanted %s t=0',
636
711
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
637
new_pack.text_index.key_count(),
638
time.time() - new_pack.start_time)
712
plain_pack_list, rev_count)
713
self._copy_revision_texts()
714
self._copy_inventory_texts()
715
self._copy_text_texts()
639
716
# select signature keys
640
signature_filter = revision_keys # same keyspace
717
signature_filter = self._revision_keys # same keyspace
641
718
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
642
719
self.packs, 'signature_index')[0]
643
720
signature_nodes = self._pack_collection._index_contents(signature_index_map,
779
862
pb.update("Copied record", record_index)
780
863
record_index += 1
865
def _get_text_nodes(self):
866
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
867
self.packs, 'text_index')[0]
868
return text_index_map, self._pack_collection._index_contents(text_index_map,
871
def _log_copied_texts(self):
872
if 'pack' in debug.debug_flags:
873
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
874
time.ctime(), self._pack_collection._upload_transport.base,
875
self.new_pack.random_name,
876
self.new_pack.text_index.key_count(),
877
time.time() - self.new_pack.start_time)
879
def _process_inventory_lines(self, inv_lines):
880
"""Use up the inv_lines generator and setup a text key filter."""
881
repo = self._pack_collection.repo
882
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
883
inv_lines, self.revision_ids)
885
for fileid, file_revids in fileid_revisions.iteritems():
886
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
887
self._text_filter = text_filter
889
def _use_pack(self, new_pack):
890
"""Return True if new_pack should be used.
892
:param new_pack: The pack that has just been created.
893
:return: True if the pack should be used.
895
return new_pack.data_inserted()
783
898
class ReconcilePacker(Packer):
784
899
"""A packer which regenerates indices etc as it copies.
905
def _extra_init(self):
906
self._data_changed = False
908
def _process_inventory_lines(self, inv_lines):
909
"""Generate a text key reference map rather for reconciling with."""
910
repo = self._pack_collection.repo
911
refs = repo._find_text_key_references_from_xml_inventory_lines(
913
self._text_refs = refs
914
# during reconcile we:
915
# - convert unreferenced texts to full texts
916
# - correct texts which reference a text not copied to be full texts
917
# - copy all others as-is but with corrected parents.
918
# - so at this point we don't know enough to decide what becomes a full
920
self._text_filter = None
922
def _copy_text_texts(self):
923
"""generate what texts we should have and then copy."""
924
self.pb.update("Copying content texts", 3)
925
# we have three major tasks here:
926
# 1) generate the ideal index
927
repo = self._pack_collection.repo
928
ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
930
self.new_pack.revision_index.iter_all_entries()])
931
ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
932
# 2) generate a text_nodes list that contains all the deltas that can
933
# be used as-is, with corrected parents.
937
NULL_REVISION = _mod_revision.NULL_REVISION
938
text_index_map, text_nodes = self._get_text_nodes()
939
for node in text_nodes:
945
ideal_parents = tuple(ideal_index[node[1]])
947
discarded_nodes.append(node)
948
self._data_changed = True
950
if ideal_parents == (NULL_REVISION,):
952
if ideal_parents == node[3][0]:
954
ok_nodes.append(node)
955
elif ideal_parents[0:1] == node[3][0][0:1]:
956
# the left most parent is the same, or there are no parents
957
# today. Either way, we can preserve the representation as
958
# long as we change the refs to be inserted.
959
self._data_changed = True
960
ok_nodes.append((node[0], node[1], node[2],
961
(ideal_parents, node[3][1])))
962
self._data_changed = True
964
# Reinsert this text completely
965
bad_texts.append((node[1], ideal_parents))
966
self._data_changed = True
967
# we're finished with some data.
970
# 3) bulk copy the ok data
971
list(self._copy_nodes_graph(ok_nodes, text_index_map,
972
self.new_pack._writer, self.new_pack.text_index))
973
# 4) adhoc copy all the other texts.
974
# We have to topologically insert all texts otherwise we can fail to
975
# reconcile when parts of a single delta chain are preserved intact,
976
# and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
977
# reinserted, and if d3 has incorrect parents it will also be
978
# reinserted. If we insert d3 first, d2 is present (as it was bulk
979
# copied), so we will try to delta, but d2 is not currently able to be
980
# extracted because it's basis d1 is not present. Topologically sorting
981
# addresses this. The following generates a sort for all the texts that
982
# are being inserted without having to reference the entire text key
983
# space (we only topo sort the revisions, which is smaller).
984
topo_order = tsort.topo_sort(ancestors)
985
rev_order = dict(zip(topo_order, range(len(topo_order))))
986
bad_texts.sort(key=lambda key:rev_order[key[0][1]])
987
transaction = repo.get_transaction()
988
file_id_index = GraphIndexPrefixAdapter(
989
self.new_pack.text_index,
991
add_nodes_callback=self.new_pack.text_index.add_nodes)
992
knit_index = KnitGraphIndex(file_id_index,
993
add_callback=file_id_index.add_nodes,
994
deltas=True, parents=True)
995
output_knit = knit.KnitVersionedFile('reconcile-texts',
996
self._pack_collection.transport,
999
access_method=_PackAccess(
1000
{self.new_pack.text_index:self.new_pack.access_tuple()},
1001
(self.new_pack._writer, self.new_pack.text_index)),
1002
factory=knit.KnitPlainFactory())
1003
for key, parent_keys in bad_texts:
1004
# We refer to the new pack to delta data being output.
1005
# A possible improvement would be to catch errors on short reads
1006
# and only flush then.
1007
self.new_pack.flush()
1009
for parent_key in parent_keys:
1010
if parent_key[0] != key[0]:
1011
# Graph parents must match the fileid
1012
raise errors.BzrError('Mismatched key parent %r:%r' %
1014
parents.append(parent_key[1])
1015
source_weave = repo.weave_store.get_weave(key[0], transaction)
1016
text_lines = source_weave.get_lines(key[1])
1017
# adapt the 'knit' to the current file_id.
1018
file_id_index = GraphIndexPrefixAdapter(
1019
self.new_pack.text_index,
1021
add_nodes_callback=self.new_pack.text_index.add_nodes)
1022
knit_index._graph_index = file_id_index
1023
knit_index._add_callback = file_id_index.add_nodes
1024
output_knit.add_lines_with_ghosts(
1025
key[1], parents, text_lines, random_id=True, check_content=False)
1026
# 5) check that nothing inserted has a reference outside the keyspace.
1027
missing_text_keys = self.new_pack._external_compression_parents_of_texts()
1028
if missing_text_keys:
1029
raise errors.BzrError('Reference to missing compression parents %r'
1031
self._log_copied_texts()
1033
def _use_pack(self, new_pack):
1034
"""Override _use_pack to check for reconcile having changed content."""
1035
# XXX: we might be better checking this at the copy time.
1036
original_inventory_keys = set()
1037
inv_index = self._pack_collection.inventory_index.combined_index
1038
for entry in inv_index.iter_all_entries():
1039
original_inventory_keys.add(entry[1])
1040
new_inventory_keys = set()
1041
for entry in new_pack.inventory_index.iter_all_entries():
1042
new_inventory_keys.add(entry[1])
1043
if new_inventory_keys != original_inventory_keys:
1044
self._data_changed = True
1045
return new_pack.data_inserted() and self._data_changed
791
1048
class RepositoryPackCollection(object):
792
1049
"""Management of packs within a repository."""