93
87
Pack.__init__(self,
94
# Revisions: parents list, no text compression.
95
index_builder_class(reference_lists=1),
96
# Inventory: We want to map compression only, but currently the
97
# knit code hasn't been updated enough to understand that, so we
98
# have a regular 2-list index giving parents and compression
100
index_builder_class(reference_lists=1),
101
# Texts: per file graph, for all fileids - so one reference list
102
# and two elements in the key tuple.
103
index_builder_class(reference_lists=1, key_elements=2),
104
# Signatures: Just blobs to store, no compression, no parents
106
index_builder_class(reference_lists=0),
107
# CHK based storage - just blobs, no compression or parents.
88
# Revisions: parents list, no text compression.
89
index_builder_class(reference_lists=1),
90
# Inventory: We want to map compression only, but currently the
91
# knit code hasn't been updated enough to understand that, so we
92
# have a regular 2-list index giving parents and compression
94
index_builder_class(reference_lists=1),
95
# Texts: per file graph, for all fileids - so one reference list
96
# and two elements in the key tuple.
97
index_builder_class(reference_lists=1, key_elements=2),
98
# Signatures: Just blobs to store, no compression, no parents
100
index_builder_class(reference_lists=0),
101
# CHK based storage - just blobs, no compression or parents.
110
104
self._pack_collection = pack_collection
111
105
# When we make readonly indices, we need this.
112
106
self.index_class = pack_collection._index_class
224
221
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
225
222
"""Filter the texts of inventories, to find the chk pages."""
226
223
total_keys = len(keys)
227
225
def _filtered_inv_stream():
228
226
id_roots_set = set()
229
227
p_id_roots_set = set()
230
228
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
231
229
for idx, record in enumerate(stream):
232
230
# Inventories should always be with revisions; assume success.
233
bytes = record.get_bytes_as('fulltext')
234
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
231
lines = record.get_bytes_as('lines')
232
chk_inv = inventory.CHKInventory.deserialise(
233
None, lines, record.key)
236
234
if pb is not None:
237
235
pb.update('inv', idx, total_keys)
238
236
key = chk_inv.id_to_entry.key()
403
405
trace.mutter('repacking %d %s', len(keys), message)
404
406
self.pb.update('repacking %s' % (message,), pb_offset)
405
child_pb = ui.ui_factory.nested_progress_bar()
407
with ui.ui_factory.nested_progress_bar() as child_pb:
407
408
stream = vf_to_stream(source_vf, keys, message, child_pb)
408
for _ in target_vf._insert_record_stream(stream,
409
for _, _ in target_vf._insert_record_stream(
410
stream, random_id=True, reuse_blocks=False):
415
413
def _copy_revision_texts(self):
416
414
source_vf, target_vf = self._build_vfs('revision', True, False)
632
625
# First, copy the existing CHKs on the assumption that most of them
633
626
# will be correct. This will save us from having to reinsert (and
634
627
# recompress) these records later at the cost of perhaps preserving a
636
629
# (Iterate but don't insert _get_filtered_inv_stream to populate the
637
630
# variables needed by GCCHKPacker._copy_chk_texts.)
638
631
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
639
self._get_filtered_inv_stream, 2)
632
self._get_filtered_inv_stream, 2)
640
633
GCCHKPacker._copy_chk_texts(self)
641
634
# Now copy and fix the inventories, and any regenerated CHKs.
642
636
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
643
637
return self._get_filtered_canonicalizing_inv_stream(
644
638
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
652
646
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
653
pb=None, source_chk_vf=None, target_chk_vf=None):
647
pb=None, source_chk_vf=None, target_chk_vf=None):
654
648
"""Filter the texts of inventories, regenerating CHKs to make sure they
657
651
total_keys = len(keys)
658
652
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
659
654
def _filtered_inv_stream():
660
655
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
661
656
search_key_name = None
662
657
for idx, record in enumerate(stream):
663
658
# Inventories should always be with revisions; assume success.
664
bytes = record.get_bytes_as('fulltext')
659
lines = record.get_bytes_as('lines')
665
660
chk_inv = inventory.CHKInventory.deserialise(
666
source_chk_vf, bytes, record.key)
661
source_chk_vf, lines, record.key)
667
662
if pb is not None:
668
663
pb.update('inv', idx, total_keys)
669
664
chk_inv.id_to_entry._ensure_root()
670
665
if search_key_name is None:
671
666
# Find the name corresponding to the search_key_func
672
667
search_key_reg = chk_map.search_key_registry
673
for search_key_name, func in viewitems(search_key_reg):
668
for search_key_name, func in search_key_reg.items():
674
669
if func == chk_inv.id_to_entry._search_key_func:
676
671
canonical_inv = inventory.CHKInventory.from_inventory(
807
803
"""subclass of PackRepository that uses CHK based inventories."""
809
805
def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
811
807
"""Overridden to change pack collection class."""
812
808
super(CHKInventoryRepository, self).__init__(_format, a_controldir,
813
control_files, _commit_builder_class, _serializer)
809
control_files, _commit_builder_class, _serializer)
814
810
index_transport = self._transport.clone('indices')
815
811
self._pack_collection = GCRepositoryPackCollection(self,
816
self._transport, index_transport,
817
self._transport.clone('upload'),
818
self._transport.clone('packs'),
819
_format.index_builder_class,
821
use_chk_index=self._format.supports_chks,
812
self._transport, index_transport,
813
self._transport.clone(
815
self._transport.clone(
817
_format.index_builder_class,
819
use_chk_index=self._format.supports_chks,
823
821
self.inventories = GroupCompressVersionedFiles(
824
822
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
825
add_callback=self._pack_collection.inventory_index.add_callback,
826
parents=True, is_locked=self.is_locked,
827
inconsistency_fatal=False),
823
add_callback=self._pack_collection.inventory_index.add_callback,
824
parents=True, is_locked=self.is_locked,
825
inconsistency_fatal=False),
828
826
access=self._pack_collection.inventory_index.data_access)
829
827
self.revisions = GroupCompressVersionedFiles(
830
828
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
831
add_callback=self._pack_collection.revision_index.add_callback,
832
parents=True, is_locked=self.is_locked,
833
track_external_parent_refs=True, track_new_keys=True),
829
add_callback=self._pack_collection.revision_index.add_callback,
830
parents=True, is_locked=self.is_locked,
831
track_external_parent_refs=True, track_new_keys=True),
834
832
access=self._pack_collection.revision_index.data_access,
836
834
self.signatures = GroupCompressVersionedFiles(
837
835
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
838
add_callback=self._pack_collection.signature_index.add_callback,
839
parents=False, is_locked=self.is_locked,
840
inconsistency_fatal=False),
836
add_callback=self._pack_collection.signature_index.add_callback,
837
parents=False, is_locked=self.is_locked,
838
inconsistency_fatal=False),
841
839
access=self._pack_collection.signature_index.data_access,
843
841
self.texts = GroupCompressVersionedFiles(
844
842
_GCGraphIndex(self._pack_collection.text_index.combined_index,
845
add_callback=self._pack_collection.text_index.add_callback,
846
parents=True, is_locked=self.is_locked,
847
inconsistency_fatal=False),
843
add_callback=self._pack_collection.text_index.add_callback,
844
parents=True, is_locked=self.is_locked,
845
inconsistency_fatal=False),
848
846
access=self._pack_collection.text_index.data_access)
849
847
# No parents, individual CHK pages don't have specific ancestry
850
848
self.chk_bytes = GroupCompressVersionedFiles(
851
849
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
852
add_callback=self._pack_collection.chk_index.add_callback,
853
parents=False, is_locked=self.is_locked,
854
inconsistency_fatal=False),
850
add_callback=self._pack_collection.chk_index.add_callback,
851
parents=False, is_locked=self.is_locked,
852
inconsistency_fatal=False),
855
853
access=self._pack_collection.chk_index.data_access)
856
854
search_key_name = self._format._serializer.search_key_name
857
855
search_key_func = chk_map.search_key_registry.get(search_key_name)
879
877
serializer = self._format._serializer
880
878
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
881
maximum_size=serializer.maximum_size,
882
search_key_name=serializer.search_key_name)
879
maximum_size=serializer.maximum_size,
880
search_key_name=serializer.search_key_name)
883
881
inv_lines = result.to_lines()
884
882
return self._inventory_add_lines(revision_id, parents,
885
inv_lines, check_content=False)
883
inv_lines, check_content=False)
887
885
def _create_inv_from_null(self, delta, revision_id):
888
886
"""This will mutate new_inv directly.
951
949
raise AssertionError("%r not in write group" % (self,))
952
950
_mod_revision.check_not_reserved_id(new_revision_id)
953
951
basis_tree = None
954
if basis_inv is None:
952
if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
955
953
if basis_revision_id == _mod_revision.NULL_REVISION:
956
954
new_inv = self._create_inv_from_null(delta, new_revision_id)
957
955
if new_inv.root_id is None:
958
956
raise errors.RootMissing()
959
957
inv_lines = new_inv.to_lines()
960
958
return self._inventory_add_lines(new_revision_id, parents,
961
inv_lines, check_content=False), new_inv
959
inv_lines, check_content=False), new_inv
963
961
basis_tree = self.revision_tree(basis_revision_id)
964
962
basis_tree.lock_read()
965
963
basis_inv = basis_tree.root_inventory
967
965
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
968
propagate_caches=propagate_caches)
966
propagate_caches=propagate_caches)
969
967
inv_lines = result.to_lines()
970
968
return self._inventory_add_lines(new_revision_id, parents,
971
inv_lines, check_content=False), result
969
inv_lines, check_content=False), result
973
971
if basis_tree is not None:
974
972
basis_tree.unlock()
976
def _deserialise_inventory(self, revision_id, bytes):
977
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
974
def _deserialise_inventory(self, revision_id, lines):
975
return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
980
978
def _iter_inventories(self, revision_ids, ordering):
981
979
"""Iterate over many inventory objects."""
1228
1221
uninteresting_pid_root_keys.add(
1229
1222
inv.parent_id_basename_to_file_id.key())
1230
1223
chk_bytes = self.from_repository.chk_bytes
1231
1225
def _filter_id_to_entry():
1232
1226
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1233
self._chk_id_roots, uninteresting_root_keys)
1227
self._chk_id_roots, uninteresting_root_keys)
1234
1228
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1235
chk_map._bytes_to_text_key):
1229
chk_map._bytes_to_text_key):
1236
1230
if record is not None:
1239
1233
self._chk_id_roots = None
1240
1234
yield 'chk_bytes', _filter_id_to_entry()
1241
1236
def _get_parent_id_basename_to_file_id_pages():
1242
1237
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1243
self._chk_p_id_roots, uninteresting_pid_root_keys):
1238
self._chk_p_id_roots, uninteresting_pid_root_keys):
1244
1239
if record is not None:
1269
1264
revision_ids = search.get_keys()
1270
pb = ui.ui_factory.nested_progress_bar()
1271
rc = self._record_counter
1272
self._record_counter.setup(len(revision_ids))
1273
for stream_info in self._fetch_revision_texts(revision_ids):
1274
yield (stream_info[0],
1275
wrap_and_count(pb, rc, stream_info[1]))
1276
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1277
# TODO: The keys to exclude might be part of the search recipe
1278
# For now, exclude all parents that are at the edge of ancestry, for
1279
# which we have inventories
1280
from_repo = self.from_repository
1281
parent_keys = from_repo._find_parent_keys_of_revisions(
1282
self._revision_keys)
1283
self.from_repository.revisions.clear_cache()
1284
self.from_repository.signatures.clear_cache()
1285
# Clear the repo's get_parent_map cache too.
1286
self.from_repository._unstacked_provider.disable_cache()
1287
self.from_repository._unstacked_provider.enable_cache()
1288
s = self._get_inventory_stream(self._revision_keys)
1289
yield (s[0], wrap_and_count(pb, rc, s[1]))
1290
self.from_repository.inventories.clear_cache()
1291
for stream_info in self._get_filtered_chk_streams(parent_keys):
1292
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1293
self.from_repository.chk_bytes.clear_cache()
1294
s = self._get_text_stream()
1295
yield (s[0], wrap_and_count(pb, rc, s[1]))
1296
self.from_repository.texts.clear_cache()
1297
pb.update('Done', rc.max, rc.max)
1265
with ui.ui_factory.nested_progress_bar() as pb:
1266
rc = self._record_counter
1267
self._record_counter.setup(len(revision_ids))
1268
for stream_info in self._fetch_revision_texts(revision_ids):
1269
yield (stream_info[0],
1270
wrap_and_count(pb, rc, stream_info[1]))
1271
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1272
# TODO: The keys to exclude might be part of the search recipe
1273
# For now, exclude all parents that are at the edge of ancestry, for
1274
# which we have inventories
1275
from_repo = self.from_repository
1276
parent_keys = from_repo._find_parent_keys_of_revisions(
1277
self._revision_keys)
1278
self.from_repository.revisions.clear_cache()
1279
self.from_repository.signatures.clear_cache()
1280
# Clear the repo's get_parent_map cache too.
1281
self.from_repository._unstacked_provider.disable_cache()
1282
self.from_repository._unstacked_provider.enable_cache()
1283
s = self._get_inventory_stream(self._revision_keys)
1284
yield (s[0], wrap_and_count(pb, rc, s[1]))
1285
self.from_repository.inventories.clear_cache()
1286
for stream_info in self._get_filtered_chk_streams(parent_keys):
1287
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1288
self.from_repository.chk_bytes.clear_cache()
1289
s = self._get_text_stream()
1290
yield (s[0], wrap_and_count(pb, rc, s[1]))
1291
self.from_repository.texts.clear_cache()
1292
pb.update('Done', rc.max, rc.max)
1300
1294
def get_stream_for_missing_keys(self, missing_keys):
1301
1295
# missing keys can only occur when we are byte copying and not