240
242
self.repo.signatures._index._add_callback = self.signature_index.add_callback
241
243
self.repo.texts._index._add_callback = self.text_index.add_callback
245
def _get_filtered_inv_stream(self, source_vf, keys):
246
"""Filter the texts of inventories, to find the chk pages."""
249
def _filter_inv_stream(stream):
250
for idx, record in enumerate(stream):
251
### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
252
bytes = record.get_bytes_as('fulltext')
253
chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
254
id_roots.add(chk_inv.id_to_entry.key())
255
p_id_map = chk_inv.parent_id_basename_to_file_id
256
if p_id_map is not None:
257
p_id_roots.add(p_id_map.key())
259
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
260
return _filter_inv_stream(stream), id_roots, p_id_roots
262
def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots):
263
# We want to stream the keys from 'id_roots', and things they
264
# reference, and then stream things from p_id_roots and things they
265
# reference, and then any remaining keys that we didn't get to.
267
# Note: We probably actually want multiple streams here, to help the
268
# client understand that the different levels won't compress well
270
remaining_keys = set(keys)
271
def _get_referenced_stream(root_keys):
274
remaining_keys.difference_update(cur_keys)
276
stream = source_vf.get_record_stream(cur_keys, 'unordered',
278
for record in stream:
279
bytes = record.get_bytes_as('fulltext')
280
# We don't care about search_key_func for this code,
281
# because we only care about external references.
282
node = chk_map._deserialise(bytes, record.key,
283
search_key_func=None)
284
next_keys.update(node.refs())
286
cur_keys = next_keys.intersection(remaining_keys)
287
for record in _get_referenced_stream(id_roots):
289
for record in _get_referenced_stream(p_id_roots):
292
trace.note('There were %d keys in the chk index, which'
293
' were not referenced from inventories',
295
stream = source_vf.get_record_stream(remaining_keys, 'unordered',
297
for record in stream:
243
300
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
244
301
reload_func=None):
245
302
"""Execute a series of pack operations.
275
332
# issue is that pages that are similar are not transmitted
276
333
# together. Perhaps get_record_stream('gc-optimal') should be
277
334
# taught about how to group chk pages?
278
336
if getattr(self, 'chk_index', None) is not None:
279
338
to_copy.insert(2, ('chk_index', 'chk_bytes'))
281
340
# Shouldn't we start_write_group around this?
310
369
is_locked=self.repo.is_locked),
311
370
access=target_access,
312
371
delta=source_vf._delta)
313
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
374
if vf_name == 'inventories':
375
stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
377
elif vf_name == 'chk_bytes':
378
stream = self._get_chk_stream(source_vf, keys,
379
id_roots, p_id_roots)
381
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
314
382
target_vf.insert_record_stream(stream)
315
383
new_pack._check_references() # shouldn't be needed