1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
19
from __future__ import absolute_import
28
revision as _mod_revision,
40
from ..bzr.btree_index import (
44
from ..bzr.groupcompress import (
46
GroupCompressVersionedFiles,
48
from .pack_repo import (
54
RepositoryPackCollection,
59
from ..bzr.vf_repository import (
62
from ..sixish import (
66
from ..static_tuple import StaticTuple
69
class GCPack(NewPack):
71
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
72
"""Create a NewPack instance.
74
:param pack_collection: A PackCollection into which this is being
76
:param upload_suffix: An optional suffix to be given to any temporary
77
files created during the pack creation. e.g '.autopack'
78
:param file_mode: An optional file mode to create the new files with.
80
# replaced from NewPack to:
81
# - change inventory reference list length to 1
82
# - change texts reference lists to 1
83
# TODO: patch this to be parameterised
85
# The relative locations of the packs are constrained, but all are
86
# passed in because the caller has them, so as to avoid object churn.
87
index_builder_class = pack_collection._index_builder_class
89
if pack_collection.chk_index is not None:
90
chk_index = index_builder_class(reference_lists=0)
94
# Revisions: parents list, no text compression.
95
index_builder_class(reference_lists=1),
96
# Inventory: We want to map compression only, but currently the
97
# knit code hasn't been updated enough to understand that, so we
98
# have a regular 2-list index giving parents and compression
100
index_builder_class(reference_lists=1),
101
# Texts: per file graph, for all fileids - so one reference list
102
# and two elements in the key tuple.
103
index_builder_class(reference_lists=1, key_elements=2),
104
# Signatures: Just blobs to store, no compression, no parents
106
index_builder_class(reference_lists=0),
107
# CHK based storage - just blobs, no compression or parents.
110
self._pack_collection = pack_collection
111
# When we make readonly indices, we need this.
112
self.index_class = pack_collection._index_class
113
# where should the new pack be opened
114
self.upload_transport = pack_collection._upload_transport
115
# where are indices written out to
116
self.index_transport = pack_collection._index_transport
117
# where is the pack renamed to when it is finished?
118
self.pack_transport = pack_collection._pack_transport
119
# What file mode to upload the pack and indices with.
120
self._file_mode = file_mode
121
# tracks the content written to the .pack file.
122
self._hash = osutils.md5()
123
# a four-tuple with the length in bytes of the indices, once the pack
124
# is finalised. (rev, inv, text, sigs)
125
self.index_sizes = None
126
# How much data to cache when writing packs. Note that this is not
127
# synchronised with reads, because it's not in the transport layer, so
128
# is not safe unless the client knows it won't be reading from the pack
130
self._cache_limit = 0
131
# the temporary pack file name.
132
self.random_name = osutils.rand_chars(20) + upload_suffix
133
# when was this pack started ?
134
self.start_time = time.time()
135
# open an output stream for the data added to the pack.
136
self.write_stream = self.upload_transport.open_write_stream(
137
self.random_name, mode=self._file_mode)
138
if 'pack' in debug.debug_flags:
139
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
140
time.ctime(), self.upload_transport.base, self.random_name,
141
time.time() - self.start_time)
142
# A list of byte sequences to be written to the new pack, and the
143
# aggregate size of them. Stored as a list rather than separate
144
# variables so that the _write_data closure below can update them.
145
self._buffer = [[], 0]
146
# create a callable for adding data
148
# robertc says- this is a closure rather than a method on the object
149
# so that the variables are locals, and faster than accessing object
152
def _write_data(data, flush=False, _buffer=self._buffer,
153
_write=self.write_stream.write, _update=self._hash.update):
154
_buffer[0].append(data)
155
_buffer[1] += len(data)
157
if _buffer[1] > self._cache_limit or flush:
158
data = b''.join(_buffer[0])
162
# expose this on self, for the occasion when clients want to add data.
163
self._write_data = _write_data
164
# a pack writer object to serialise pack records.
165
self._writer = pack.ContainerWriter(self._write_data)
167
# what state is the pack in? (open, finished, aborted)
169
# no name until we finish writing the content
172
def _check_references(self):
173
"""Make sure our external references are present.
175
Packs are allowed to have deltas whose base is not in the pack, but it
176
must be present somewhere in this collection. It is not allowed to
177
have deltas based on a fallback repository.
178
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
180
# Groupcompress packs don't have any external references, arguably CHK
181
# pages have external references, but we cannot 'cheaply' determine
182
# them without actually walking all of the chk pages.
185
class ResumedGCPack(ResumedPack):
187
def _check_references(self):
188
"""Make sure our external compression parents are present."""
189
# See GCPack._check_references for why this is empty
191
def _get_external_refs(self, index):
192
# GC repositories don't have compression parents external to a given
197
class GCCHKPacker(Packer):
198
"""This class understand what it takes to collect a GCCHK repo."""
200
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
202
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
203
revision_ids=revision_ids,
204
reload_func=reload_func)
205
self._pack_collection = pack_collection
206
# ATM, We only support this for GCCHK repositories
207
if pack_collection.chk_index is None:
208
raise AssertionError(
209
'pack_collection.chk_index should not be None')
210
self._gather_text_refs = False
211
self._chk_id_roots = []
212
self._chk_p_id_roots = []
213
self._text_refs = None
214
# set by .pack() if self.revision_ids is not None
215
self.revision_keys = None
217
def _get_progress_stream(self, source_vf, keys, message, pb):
219
substream = source_vf.get_record_stream(
220
keys, 'groupcompress', True)
221
for idx, record in enumerate(substream):
223
pb.update(message, idx + 1, len(keys))
227
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
228
"""Filter the texts of inventories, to find the chk pages."""
229
total_keys = len(keys)
231
def _filtered_inv_stream():
233
p_id_roots_set = set()
234
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
235
for idx, record in enumerate(stream):
236
# Inventories should always be with revisions; assume success.
237
bytes = record.get_bytes_as('fulltext')
238
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
241
pb.update('inv', idx, total_keys)
242
key = chk_inv.id_to_entry.key()
243
if key not in id_roots_set:
244
self._chk_id_roots.append(key)
245
id_roots_set.add(key)
246
p_id_map = chk_inv.parent_id_basename_to_file_id
248
raise AssertionError('Parent id -> file_id map not set')
250
if key not in p_id_roots_set:
251
p_id_roots_set.add(key)
252
self._chk_p_id_roots.append(key)
254
# We have finished processing all of the inventory records, we
255
# don't need these sets anymore
257
p_id_roots_set.clear()
258
return _filtered_inv_stream()
260
def _get_chk_streams(self, source_vf, keys, pb=None):
261
# We want to stream the keys from 'id_roots', and things they
262
# reference, and then stream things from p_id_roots and things they
263
# reference, and then any remaining keys that we didn't get to.
265
# We also group referenced texts together, so if one root references a
266
# text with prefix 'a', and another root references a node with prefix
267
# 'a', we want to yield those nodes before we yield the nodes for 'b'
268
# This keeps 'similar' nodes together.
270
# Note: We probably actually want multiple streams here, to help the
271
# client understand that the different levels won't compress well
272
# against each other.
273
# Test the difference between using one Group per level, and
274
# using 1 Group per prefix. (so '' (root) would get a group, then
275
# all the references to search-key 'a' would get a group, etc.)
276
total_keys = len(keys)
277
remaining_keys = set(keys)
279
if self._gather_text_refs:
280
self._text_refs = set()
282
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
285
keys_by_search_prefix = {}
286
remaining_keys.difference_update(cur_keys)
289
def handle_internal_node(node):
290
for prefix, value in viewitems(node._items):
291
# We don't want to request the same key twice, and we
292
# want to order it by the first time it is seen.
293
# Even further, we don't want to request a key which is
294
# not in this group of pack files (it should be in the
295
# repo, but it doesn't have to be in the group being
297
# TODO: consider how to treat externally referenced chk
298
# pages as 'external_references' so that we
299
# always fill them in for stacked branches
300
if value not in next_keys and value in remaining_keys:
301
keys_by_search_prefix.setdefault(prefix,
305
def handle_leaf_node(node):
306
# Store is None, because we know we have a LeafNode, and we
307
# just want its entries
308
for file_id, bytes in node.iteritems(None):
309
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
312
stream = source_vf.get_record_stream(cur_keys,
313
'as-requested', True)
314
for record in stream:
315
if record.storage_kind == 'absent':
316
# An absent CHK record: we assume that the missing
317
# record is in a different pack - e.g. a page not
318
# altered by the commit we're packing.
320
bytes = record.get_bytes_as('fulltext')
321
# We don't care about search_key_func for this code,
322
# because we only care about external references.
323
node = chk_map._deserialise(bytes, record.key,
324
search_key_func=None)
325
common_base = node._search_prefix
326
if isinstance(node, chk_map.InternalNode):
327
handle_internal_node(node)
328
elif parse_leaf_nodes:
329
handle_leaf_node(node)
332
pb.update('chk node', counter[0], total_keys)
335
# Double check that we won't be emitting any keys twice
336
# If we get rid of the pre-calculation of all keys, we could
337
# turn this around and do
338
# next_keys.difference_update(seen_keys)
339
# However, we also may have references to chk pages in another
340
# pack file during autopack. We filter earlier, so we should no
341
# longer need to do this
342
# next_keys = next_keys.intersection(remaining_keys)
344
for prefix in sorted(keys_by_search_prefix):
345
cur_keys.extend(keys_by_search_prefix.pop(prefix))
346
for stream in _get_referenced_stream(self._chk_id_roots,
347
self._gather_text_refs):
349
del self._chk_id_roots
350
# while it isn't really possible for chk_id_roots to not be in the
351
# local group of packs, it is possible that the tree shape has not
352
# changed recently, so we need to filter _chk_p_id_roots by the
354
chk_p_id_roots = [key for key in self._chk_p_id_roots
355
if key in remaining_keys]
356
del self._chk_p_id_roots
357
for stream in _get_referenced_stream(chk_p_id_roots, False):
360
trace.mutter('There were %d keys in the chk index, %d of which'
361
' were not referenced', total_keys,
363
if self.revision_ids is None:
364
stream = source_vf.get_record_stream(remaining_keys,
368
def _build_vf(self, index_name, parents, delta, for_write=False):
369
"""Build a VersionedFiles instance on top of this group of packs."""
370
index_name = index_name + '_index'
372
access = _DirectPackAccess(index_to_pack,
373
reload_func=self._reload_func)
376
if self.new_pack is None:
377
raise AssertionError('No new pack has been set')
378
index = getattr(self.new_pack, index_name)
379
index_to_pack[index] = self.new_pack.access_tuple()
380
index.set_optimize(for_size=True)
381
access.set_writer(self.new_pack._writer, index,
382
self.new_pack.access_tuple())
383
add_callback = index.add_nodes
386
for pack in self.packs:
387
sub_index = getattr(pack, index_name)
388
index_to_pack[sub_index] = pack.access_tuple()
389
indices.append(sub_index)
390
index = _mod_index.CombinedGraphIndex(indices)
392
vf = GroupCompressVersionedFiles(
394
add_callback=add_callback,
396
is_locked=self._pack_collection.repo.is_locked),
401
def _build_vfs(self, index_name, parents, delta):
402
"""Build the source and target VersionedFiles."""
403
source_vf = self._build_vf(index_name, parents,
404
delta, for_write=False)
405
target_vf = self._build_vf(index_name, parents,
406
delta, for_write=True)
407
return source_vf, target_vf
409
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
411
trace.mutter('repacking %d %s', len(keys), message)
412
self.pb.update('repacking %s' % (message,), pb_offset)
413
with ui.ui_factory.nested_progress_bar() as child_pb:
414
stream = vf_to_stream(source_vf, keys, message, child_pb)
415
for _ in target_vf._insert_record_stream(stream,
420
def _copy_revision_texts(self):
421
source_vf, target_vf = self._build_vfs('revision', True, False)
422
if not self.revision_keys:
423
# We are doing a full fetch, aka 'pack'
424
self.revision_keys = source_vf.keys()
425
self._copy_stream(source_vf, target_vf, self.revision_keys,
426
'revisions', self._get_progress_stream, 1)
428
def _copy_inventory_texts(self):
429
source_vf, target_vf = self._build_vfs('inventory', True, True)
430
# It is not sufficient to just use self.revision_keys, as stacked
431
# repositories can have more inventories than they have revisions.
432
# One alternative would be to do something with
433
# get_parent_map(self.revision_keys), but that shouldn't be any faster
435
inventory_keys = source_vf.keys()
436
missing_inventories = set(
437
self.revision_keys).difference(inventory_keys)
438
if missing_inventories:
439
# Go back to the original repo, to see if these are really missing
440
# https://bugs.launchpad.net/bzr/+bug/437003
441
# If we are packing a subset of the repo, it is fine to just have
442
# the data in another Pack file, which is not included in this pack
444
inv_index = self._pack_collection.repo.inventories._index
445
pmap = inv_index.get_parent_map(missing_inventories)
446
really_missing = missing_inventories.difference(pmap)
448
missing_inventories = sorted(really_missing)
449
raise ValueError('We are missing inventories for revisions: %s'
450
% (missing_inventories,))
451
self._copy_stream(source_vf, target_vf, inventory_keys,
452
'inventories', self._get_filtered_inv_stream, 2)
454
def _get_chk_vfs_for_copy(self):
455
return self._build_vfs('chk', False, False)
457
def _copy_chk_texts(self):
458
source_vf, target_vf = self._get_chk_vfs_for_copy()
459
# TODO: This is technically spurious... if it is a performance issue,
461
total_keys = source_vf.keys()
462
trace.mutter('repacking chk: %d id_to_entry roots,'
463
' %d p_id_map roots, %d total keys',
464
len(self._chk_id_roots), len(self._chk_p_id_roots),
466
self.pb.update('repacking chk', 3)
467
with ui.ui_factory.nested_progress_bar() as child_pb:
468
for stream in self._get_chk_streams(source_vf, total_keys,
470
for _ in target_vf._insert_record_stream(stream,
475
def _copy_text_texts(self):
476
source_vf, target_vf = self._build_vfs('text', True, True)
477
# XXX: We don't walk the chk map to determine referenced (file_id,
478
# revision_id) keys. We don't do it yet because you really need
479
# to filter out the ones that are present in the parents of the
480
# rev just before the ones you are copying, otherwise the filter
481
# is grabbing too many keys...
482
text_keys = source_vf.keys()
483
self._copy_stream(source_vf, target_vf, text_keys,
484
'texts', self._get_progress_stream, 4)
486
def _copy_signature_texts(self):
487
source_vf, target_vf = self._build_vfs('signature', False, False)
488
signature_keys = source_vf.keys()
489
signature_keys.intersection(self.revision_keys)
490
self._copy_stream(source_vf, target_vf, signature_keys,
491
'signatures', self._get_progress_stream, 5)
493
def _create_pack_from_packs(self):
494
self.pb.update('repacking', 0, 7)
495
self.new_pack = self.open_pack()
496
# Is this necessary for GC ?
497
self.new_pack.set_write_cache_size(1024 * 1024)
498
self._copy_revision_texts()
499
self._copy_inventory_texts()
500
self._copy_chk_texts()
501
self._copy_text_texts()
502
self._copy_signature_texts()
503
self.new_pack._check_references()
504
if not self._use_pack(self.new_pack):
505
self.new_pack.abort()
507
self.new_pack.finish_content()
508
if len(self.packs) == 1:
509
old_pack = self.packs[0]
510
if old_pack.name == self.new_pack._hash.hexdigest():
511
# The single old pack was already optimally packed.
512
trace.mutter('single pack %s was already optimally packed',
514
self.new_pack.abort()
516
self.pb.update('finishing repack', 6, 7)
517
self.new_pack.finish()
518
self._pack_collection.allocate(self.new_pack)
522
class GCCHKReconcilePacker(GCCHKPacker):
523
"""A packer which regenerates indices etc as it copies.
525
This is used by ``brz reconcile`` to cause parent text pointers to be
529
def __init__(self, *args, **kwargs):
530
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
531
self._data_changed = False
532
self._gather_text_refs = True
534
def _copy_inventory_texts(self):
535
source_vf, target_vf = self._build_vfs('inventory', True, True)
536
self._copy_stream(source_vf, target_vf, self.revision_keys,
537
'inventories', self._get_filtered_inv_stream, 2)
538
if source_vf.keys() != self.revision_keys:
539
self._data_changed = True
541
def _copy_text_texts(self):
542
"""generate what texts we should have and then copy."""
543
source_vf, target_vf = self._build_vfs('text', True, True)
544
trace.mutter('repacking %d texts', len(self._text_refs))
545
self.pb.update("repacking texts", 4)
546
# we have three major tasks here:
547
# 1) generate the ideal index
548
repo = self._pack_collection.repo
549
# We want the one we just wrote, so base it on self.new_pack
550
revision_vf = self._build_vf('revision', True, False, for_write=True)
551
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
552
# Strip keys back into revision_ids.
553
ancestors = dict((k[0], tuple([p[0] for p in parents]))
554
for k, parents in viewitems(ancestor_keys))
556
# TODO: _generate_text_key_index should be much cheaper to generate from
557
# a chk repository, rather than the current implementation
558
ideal_index = repo._generate_text_key_index(None, ancestors)
559
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
560
# 2) generate a keys list that contains all the entries that can
561
# be used as-is, with corrected parents.
563
new_parent_keys = {} # (key, parent_keys)
565
NULL_REVISION = _mod_revision.NULL_REVISION
566
for key in self._text_refs:
572
ideal_parents = tuple(ideal_index[key])
574
discarded_keys.append(key)
575
self._data_changed = True
577
if ideal_parents == (NULL_REVISION,):
579
source_parents = file_id_parent_map[key]
580
if ideal_parents == source_parents:
584
# We need to change the parent graph, but we don't need to
585
# re-insert the text (since we don't pun the compression
586
# parent with the parents list)
587
self._data_changed = True
588
new_parent_keys[key] = ideal_parents
589
# we're finished with some data.
591
del file_id_parent_map
592
# 3) bulk copy the data, updating records than need it
594
def _update_parents_for_texts():
595
stream = source_vf.get_record_stream(self._text_refs,
596
'groupcompress', False)
597
for record in stream:
598
if record.key in new_parent_keys:
599
record.parents = new_parent_keys[record.key]
601
target_vf.insert_record_stream(_update_parents_for_texts())
603
def _use_pack(self, new_pack):
604
"""Override _use_pack to check for reconcile having changed content."""
605
return new_pack.data_inserted() and self._data_changed
608
class GCCHKCanonicalizingPacker(GCCHKPacker):
609
"""A packer that ensures inventories have canonical-form CHK maps.
611
Ideally this would be part of reconcile, but it's very slow and rarely
612
needed. (It repairs repositories affected by
613
https://bugs.launchpad.net/bzr/+bug/522637).
616
def __init__(self, *args, **kwargs):
617
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
618
self._data_changed = False
620
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
621
"""Create and exhaust a stream, but don't insert it.
623
This is useful to get the side-effects of generating a stream.
625
self.pb.update('scanning %s' % (message,), pb_offset)
626
with ui.ui_factory.nested_progress_bar() as child_pb:
627
list(vf_to_stream(source_vf, keys, message, child_pb))
629
def _copy_inventory_texts(self):
630
source_vf, target_vf = self._build_vfs('inventory', True, True)
631
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
632
inventory_keys = source_vf.keys()
633
# First, copy the existing CHKs on the assumption that most of them
634
# will be correct. This will save us from having to reinsert (and
635
# recompress) these records later at the cost of perhaps preserving a
637
# (Iterate but don't insert _get_filtered_inv_stream to populate the
638
# variables needed by GCCHKPacker._copy_chk_texts.)
639
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
640
self._get_filtered_inv_stream, 2)
641
GCCHKPacker._copy_chk_texts(self)
642
# Now copy and fix the inventories, and any regenerated CHKs.
644
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
645
return self._get_filtered_canonicalizing_inv_stream(
646
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
647
self._copy_stream(source_vf, target_vf, inventory_keys,
648
'inventories', chk_canonicalizing_inv_stream, 4)
650
def _copy_chk_texts(self):
651
# No-op; in this class this happens during _copy_inventory_texts.
654
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
655
pb=None, source_chk_vf=None, target_chk_vf=None):
656
"""Filter the texts of inventories, regenerating CHKs to make sure they
659
total_keys = len(keys)
660
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
662
def _filtered_inv_stream():
663
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
664
search_key_name = None
665
for idx, record in enumerate(stream):
666
# Inventories should always be with revisions; assume success.
667
bytes = record.get_bytes_as('fulltext')
668
chk_inv = inventory.CHKInventory.deserialise(
669
source_chk_vf, bytes, record.key)
671
pb.update('inv', idx, total_keys)
672
chk_inv.id_to_entry._ensure_root()
673
if search_key_name is None:
674
# Find the name corresponding to the search_key_func
675
search_key_reg = chk_map.search_key_registry
676
for search_key_name, func in viewitems(search_key_reg):
677
if func == chk_inv.id_to_entry._search_key_func:
679
canonical_inv = inventory.CHKInventory.from_inventory(
680
target_chk_vf, chk_inv,
681
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
682
search_key_name=search_key_name)
683
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
685
'Non-canonical CHK map for id_to_entry of inv: %s '
686
'(root is %s, should be %s)' % (chk_inv.revision_id,
687
chk_inv.id_to_entry.key()[
689
canonical_inv.id_to_entry.key()[0]))
690
self._data_changed = True
691
p_id_map = chk_inv.parent_id_basename_to_file_id
692
p_id_map._ensure_root()
693
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
694
if p_id_map.key() != canon_p_id_map.key():
696
'Non-canonical CHK map for parent_id_to_basename of '
697
'inv: %s (root is %s, should be %s)'
698
% (chk_inv.revision_id, p_id_map.key()[0],
699
canon_p_id_map.key()[0]))
700
self._data_changed = True
701
yield versionedfile.ChunkedContentFactory(record.key,
702
record.parents, record.sha1,
703
canonical_inv.to_lines())
704
# We have finished processing all of the inventory records, we
705
# don't need these sets anymore
706
return _filtered_inv_stream()
708
def _use_pack(self, new_pack):
709
"""Override _use_pack to check for reconcile having changed content."""
710
return new_pack.data_inserted() and self._data_changed
713
class GCRepositoryPackCollection(RepositoryPackCollection):
715
pack_factory = GCPack
716
resumed_pack_factory = ResumedGCPack
717
normal_packer_class = GCCHKPacker
718
optimising_packer_class = GCCHKPacker
720
def _check_new_inventories(self):
721
"""Detect missing inventories or chk root entries for the new revisions
724
:returns: list of strs, summarising any problems found. If the list is
725
empty no problems were found.
727
# Ensure that all revisions added in this write group have:
728
# - corresponding inventories,
729
# - chk root entries for those inventories,
730
# - and any present parent inventories have their chk root
732
# And all this should be independent of any fallback repository.
734
key_deps = self.repo.revisions._index._key_dependencies
735
new_revisions_keys = key_deps.get_new_keys()
736
no_fallback_inv_index = self.repo.inventories._index
737
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
738
no_fallback_texts_index = self.repo.texts._index
739
inv_parent_map = no_fallback_inv_index.get_parent_map(
741
# Are any inventories for corresponding to the new revisions missing?
742
corresponding_invs = set(inv_parent_map)
743
missing_corresponding = set(new_revisions_keys)
744
missing_corresponding.difference_update(corresponding_invs)
745
if missing_corresponding:
746
problems.append("inventories missing for revisions %s" %
747
(sorted(missing_corresponding),))
749
# Are any chk root entries missing for any inventories? This includes
750
# any present parent inventories, which may be used when calculating
751
# deltas for streaming.
752
all_inv_keys = set(corresponding_invs)
753
for parent_inv_keys in viewvalues(inv_parent_map):
754
all_inv_keys.update(parent_inv_keys)
755
# Filter out ghost parents.
756
all_inv_keys.intersection_update(
757
no_fallback_inv_index.get_parent_map(all_inv_keys))
758
parent_invs_only_keys = all_inv_keys.symmetric_difference(
761
inv_ids = [key[-1] for key in all_inv_keys]
762
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
763
root_key_info = _build_interesting_key_sets(
764
self.repo, inv_ids, parent_invs_only_ids)
765
expected_chk_roots = root_key_info.all_keys()
766
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
768
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
769
if missing_chk_roots:
771
"missing referenced chk root keys: %s."
772
"Run 'brz reconcile --canonicalize-chks' on the affected "
774
% (sorted(missing_chk_roots),))
775
# Don't bother checking any further.
777
# Find all interesting chk_bytes records, and make sure they are
778
# present, as well as the text keys they reference.
779
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
780
chk_bytes_no_fallbacks._search_key_func = \
781
self.repo.chk_bytes._search_key_func
782
chk_diff = chk_map.iter_interesting_nodes(
783
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
784
root_key_info.uninteresting_root_keys)
787
for record in _filter_text_keys(chk_diff, text_keys,
788
chk_map._bytes_to_text_key):
790
except errors.NoSuchRevision as e:
791
# XXX: It would be nice if we could give a more precise error here.
792
problems.append("missing chk node(s) for id_to_entry maps")
793
chk_diff = chk_map.iter_interesting_nodes(
794
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
795
root_key_info.uninteresting_pid_root_keys)
797
for interesting_rec, interesting_map in chk_diff:
799
except errors.NoSuchRevision as e:
801
"missing chk node(s) for parent_id_basename_to_file_id maps")
802
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
803
missing_text_keys = text_keys.difference(present_text_keys)
804
if missing_text_keys:
805
problems.append("missing text keys: %r"
806
% (sorted(missing_text_keys),))
810
class CHKInventoryRepository(PackRepository):
811
"""subclass of PackRepository that uses CHK based inventories."""
813
def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
815
"""Overridden to change pack collection class."""
816
super(CHKInventoryRepository, self).__init__(_format, a_controldir,
817
control_files, _commit_builder_class, _serializer)
818
index_transport = self._transport.clone('indices')
819
self._pack_collection = GCRepositoryPackCollection(self,
820
self._transport, index_transport,
821
self._transport.clone(
823
self._transport.clone(
825
_format.index_builder_class,
827
use_chk_index=self._format.supports_chks,
829
self.inventories = GroupCompressVersionedFiles(
830
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
831
add_callback=self._pack_collection.inventory_index.add_callback,
832
parents=True, is_locked=self.is_locked,
833
inconsistency_fatal=False),
834
access=self._pack_collection.inventory_index.data_access)
835
self.revisions = GroupCompressVersionedFiles(
836
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
837
add_callback=self._pack_collection.revision_index.add_callback,
838
parents=True, is_locked=self.is_locked,
839
track_external_parent_refs=True, track_new_keys=True),
840
access=self._pack_collection.revision_index.data_access,
842
self.signatures = GroupCompressVersionedFiles(
843
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
844
add_callback=self._pack_collection.signature_index.add_callback,
845
parents=False, is_locked=self.is_locked,
846
inconsistency_fatal=False),
847
access=self._pack_collection.signature_index.data_access,
849
self.texts = GroupCompressVersionedFiles(
850
_GCGraphIndex(self._pack_collection.text_index.combined_index,
851
add_callback=self._pack_collection.text_index.add_callback,
852
parents=True, is_locked=self.is_locked,
853
inconsistency_fatal=False),
854
access=self._pack_collection.text_index.data_access)
855
# No parents, individual CHK pages don't have specific ancestry
856
self.chk_bytes = GroupCompressVersionedFiles(
857
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
858
add_callback=self._pack_collection.chk_index.add_callback,
859
parents=False, is_locked=self.is_locked,
860
inconsistency_fatal=False),
861
access=self._pack_collection.chk_index.data_access)
862
search_key_name = self._format._serializer.search_key_name
863
search_key_func = chk_map.search_key_registry.get(search_key_name)
864
self.chk_bytes._search_key_func = search_key_func
865
# True when the repository object is 'write locked' (as opposed to the
866
# physical lock only taken out around changes to the pack-names list.)
867
# Another way to represent this would be a decorator around the control
868
# files object that presents logical locks as physical ones - if this
869
# gets ugly consider that alternative design. RBC 20071011
870
self._write_lock_count = 0
871
self._transaction = None
873
self._reconcile_does_inventory_gc = True
874
self._reconcile_fixes_text_parents = True
875
self._reconcile_backsup_inventory = False
877
def _add_inventory_checked(self, revision_id, inv, parents):
878
"""Add inv to the repository after checking the inputs.
880
This function can be overridden to allow different inventory styles.
882
:seealso: add_inventory, for the contract.
885
serializer = self._format._serializer
886
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
887
maximum_size=serializer.maximum_size,
888
search_key_name=serializer.search_key_name)
889
inv_lines = result.to_lines()
890
return self._inventory_add_lines(revision_id, parents,
891
inv_lines, check_content=False)
893
def _create_inv_from_null(self, delta, revision_id):
894
"""This will mutate new_inv directly.
896
This is a simplified form of create_by_apply_delta which knows that all
897
the old values must be None, so everything is a create.
899
serializer = self._format._serializer
900
new_inv = inventory.CHKInventory(serializer.search_key_name)
901
new_inv.revision_id = revision_id
902
entry_to_bytes = new_inv._entry_to_bytes
903
id_to_entry_dict = {}
904
parent_id_basename_dict = {}
905
for old_path, new_path, file_id, entry in delta:
906
if old_path is not None:
907
raise ValueError('Invalid delta, somebody tried to delete %r'
908
' from the NULL_REVISION'
909
% ((old_path, file_id),))
911
raise ValueError('Invalid delta, delta from NULL_REVISION has'
912
' no new_path %r' % (file_id,))
914
new_inv.root_id = file_id
915
parent_id_basename_key = StaticTuple(b'', b'').intern()
917
utf8_entry_name = entry.name.encode('utf-8')
918
parent_id_basename_key = StaticTuple(entry.parent_id,
919
utf8_entry_name).intern()
920
new_value = entry_to_bytes(entry)
922
# new_inv._path_to_fileid_cache[new_path] = file_id
923
key = StaticTuple(file_id).intern()
924
id_to_entry_dict[key] = new_value
925
parent_id_basename_dict[parent_id_basename_key] = file_id
927
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
928
parent_id_basename_dict, maximum_size=serializer.maximum_size)
931
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
932
parents, basis_inv=None, propagate_caches=False):
933
"""Add a new inventory expressed as a delta against another revision.
935
:param basis_revision_id: The inventory id the delta was created
937
:param delta: The inventory delta (see Inventory.apply_delta for
939
:param new_revision_id: The revision id that the inventory is being
941
:param parents: The revision ids of the parents that revision_id is
942
known to have and are in the repository already. These are supplied
943
for repositories that depend on the inventory graph for revision
944
graph access, as well as for those that pun ancestry with delta
946
:param basis_inv: The basis inventory if it is already known,
948
:param propagate_caches: If True, the caches for this inventory are
949
copied to and updated for the result if possible.
951
:returns: (validator, new_inv)
952
The validator(which is a sha1 digest, though what is sha'd is
953
repository format specific) of the serialized inventory, and the
956
if not self.is_in_write_group():
957
raise AssertionError("%r not in write group" % (self,))
958
_mod_revision.check_not_reserved_id(new_revision_id)
960
if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
961
if basis_revision_id == _mod_revision.NULL_REVISION:
962
new_inv = self._create_inv_from_null(delta, new_revision_id)
963
if new_inv.root_id is None:
964
raise errors.RootMissing()
965
inv_lines = new_inv.to_lines()
966
return self._inventory_add_lines(new_revision_id, parents,
967
inv_lines, check_content=False), new_inv
969
basis_tree = self.revision_tree(basis_revision_id)
970
basis_tree.lock_read()
971
basis_inv = basis_tree.root_inventory
973
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
974
propagate_caches=propagate_caches)
975
inv_lines = result.to_lines()
976
return self._inventory_add_lines(new_revision_id, parents,
977
inv_lines, check_content=False), result
979
if basis_tree is not None:
982
def _deserialise_inventory(self, revision_id, bytes):
983
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
986
def _iter_inventories(self, revision_ids, ordering):
987
"""Iterate over many inventory objects."""
989
ordering = 'unordered'
990
keys = [(revision_id,) for revision_id in revision_ids]
991
stream = self.inventories.get_record_stream(keys, ordering, True)
993
for record in stream:
994
if record.storage_kind != 'absent':
995
texts[record.key] = record.get_bytes_as('fulltext')
997
texts[record.key] = None
1001
yield (None, key[-1])
1003
yield (inventory.CHKInventory.deserialise(
1004
self.chk_bytes, bytes, key), key[-1])
1006
def _get_inventory_xml(self, revision_id):
1007
"""Get serialized inventory as a string."""
1008
# Without a native 'xml' inventory, this method doesn't make sense.
1009
# However older working trees, and older bundles want it - so we supply
1010
# it allowing _get_inventory_xml to work. Bundles currently use the
1011
# serializer directly; this also isn't ideal, but there isn't an xml
1012
# iteration interface offered at all for repositories.
1013
return self._serializer.write_inventory_to_string(
1014
self.get_inventory(revision_id))
1016
def _find_present_inventory_keys(self, revision_keys):
1017
parent_map = self.inventories.get_parent_map(revision_keys)
1018
present_inventory_keys = set(k for k in parent_map)
1019
return present_inventory_keys
1021
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1022
"""Find the file ids and versions affected by revisions.
1024
:param revisions: an iterable containing revision ids.
1025
:param _inv_weave: The inventory weave from this repository or None.
1026
If None, the inventory weave will be opened automatically.
1027
:return: a dictionary mapping altered file-ids to an iterable of
1028
revision_ids. Each altered file-ids has the exact revision_ids that
1029
altered it listed explicitly.
1031
rich_root = self.supports_rich_root()
1032
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1033
file_id_revisions = {}
1034
with ui.ui_factory.nested_progress_bar() as pb:
1035
revision_keys = [(r,) for r in revision_ids]
1036
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1037
# TODO: instead of using _find_present_inventory_keys, change the
1038
# code paths to allow missing inventories to be tolerated.
1039
# However, we only want to tolerate missing parent
1040
# inventories, not missing inventories for revision_ids
1041
present_parent_inv_keys = self._find_present_inventory_keys(
1043
present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1044
inventories_to_read = set(revision_ids)
1045
inventories_to_read.update(present_parent_inv_ids)
1046
root_key_info = _build_interesting_key_sets(
1047
self, inventories_to_read, present_parent_inv_ids)
1048
interesting_root_keys = root_key_info.interesting_root_keys
1049
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1050
chk_bytes = self.chk_bytes
1051
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1052
interesting_root_keys, uninteresting_root_keys,
1054
for name, bytes in items:
1055
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1056
# TODO: consider interning file_id, revision_id here, or
1057
# pushing that intern() into bytes_to_info()
1058
# TODO: rich_root should always be True here, for all
1059
# repositories that support chk_bytes
1060
if not rich_root and name_utf8 == '':
1063
file_id_revisions[file_id].add(revision_id)
1065
file_id_revisions[file_id] = {revision_id}
1066
return file_id_revisions
1068
def find_text_key_references(self):
1069
"""Find the text key references within the repository.
1071
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1072
to whether they were referred to by the inventory of the
1073
revision_id that they contain. The inventory texts from all present
1074
revision ids are assessed to generate this report.
1076
# XXX: Slow version but correct: rewrite as a series of delta
1077
# examinations/direct tree traversal. Note that that will require care
1078
# as a common node is reachable both from the inventory that added it,
1079
# and others afterwards.
1080
revision_keys = self.revisions.keys()
1082
rich_roots = self.supports_rich_root()
1083
with ui.ui_factory.nested_progress_bar() as pb:
1084
all_revs = self.all_revision_ids()
1085
total = len(all_revs)
1086
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1087
pb.update("Finding text references", pos, total)
1088
for _, entry in inv.iter_entries():
1089
if not rich_roots and entry.file_id == inv.root_id:
1091
key = (entry.file_id, entry.revision)
1092
result.setdefault(key, False)
1093
if entry.revision == inv.revision_id:
1097
def reconcile_canonicalize_chks(self):
1098
"""Reconcile this repository to make sure all CHKs are in canonical
1101
from .reconcile import PackReconciler
1102
with self.lock_write():
1103
reconciler = PackReconciler(
1104
self, thorough=True, canonicalize_chks=True)
1105
return reconciler.reconcile()
1107
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1108
packer = GCCHKReconcilePacker(collection, packs, extension)
1109
return packer.pack(pb)
1111
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1112
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1113
return packer.pack(pb)
1115
def _get_source(self, to_format):
1116
"""Return a source for streaming from this repository."""
1117
if self._format._serializer == to_format._serializer:
1118
# We must be exactly the same format, otherwise stuff like the chk
1119
# page layout might be different.
1120
# Actually, this test is just slightly looser than exact so that
1121
# CHK2 <-> 2a transfers will work.
1122
return GroupCHKStreamSource(self, to_format)
1123
return super(CHKInventoryRepository, self)._get_source(to_format)
1125
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1126
"""Find revisions with different parent lists in the revision object
1127
and in the index graph.
1129
:param revisions_iterator: None, or an iterator of (revid,
1130
Revision-or-None). This iterator controls the revisions checked.
1131
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1132
parents-in-revision).
1134
if not self.is_locked():
1135
raise AssertionError()
1137
if revisions_iterator is None:
1138
revisions_iterator = self.iter_revisions(self.all_revision_ids())
1139
for revid, revision in revisions_iterator:
1140
if revision is None:
1142
parent_map = vf.get_parent_map([(revid,)])
1143
parents_according_to_index = tuple(parent[-1] for parent in
1144
parent_map[(revid,)])
1145
parents_according_to_revision = tuple(revision.parent_ids)
1146
if parents_according_to_index != parents_according_to_revision:
1147
yield (revid, parents_according_to_index,
1148
parents_according_to_revision)
1150
def _check_for_inconsistent_revision_parents(self):
1151
inconsistencies = list(self._find_inconsistent_revision_parents())
1153
raise errors.BzrCheckError(
1154
"Revision index has inconsistent parents.")
1157
class GroupCHKStreamSource(StreamSource):
1158
"""Used when both the source and target repo are GroupCHK repos."""
1160
def __init__(self, from_repository, to_format):
1161
"""Create a StreamSource streaming from from_repository."""
1162
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1163
self._revision_keys = None
1164
self._text_keys = None
1165
self._text_fetch_order = 'groupcompress'
1166
self._chk_id_roots = None
1167
self._chk_p_id_roots = None
1169
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1170
"""Get a stream of inventory texts.
1172
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1173
should be populated.
1175
self._chk_id_roots = []
1176
self._chk_p_id_roots = []
1178
def _filtered_inv_stream():
1179
id_roots_set = set()
1180
p_id_roots_set = set()
1181
source_vf = self.from_repository.inventories
1182
stream = source_vf.get_record_stream(inventory_keys,
1183
'groupcompress', True)
1184
for record in stream:
1185
if record.storage_kind == 'absent':
1189
raise errors.NoSuchRevision(self, record.key)
1190
bytes = record.get_bytes_as('fulltext')
1191
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1193
key = chk_inv.id_to_entry.key()
1194
if key not in id_roots_set:
1195
self._chk_id_roots.append(key)
1196
id_roots_set.add(key)
1197
p_id_map = chk_inv.parent_id_basename_to_file_id
1198
if p_id_map is None:
1199
raise AssertionError('Parent id -> file_id map not set')
1200
key = p_id_map.key()
1201
if key not in p_id_roots_set:
1202
p_id_roots_set.add(key)
1203
self._chk_p_id_roots.append(key)
1205
# We have finished processing all of the inventory records, we
1206
# don't need these sets anymore
1207
id_roots_set.clear()
1208
p_id_roots_set.clear()
1209
return ('inventories', _filtered_inv_stream())
1211
def _get_filtered_chk_streams(self, excluded_revision_keys):
1212
self._text_keys = set()
1213
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1214
if not excluded_revision_keys:
1215
uninteresting_root_keys = set()
1216
uninteresting_pid_root_keys = set()
1218
# filter out any excluded revisions whose inventories are not
1220
# TODO: Update Repository.iter_inventories() to add
1221
# ignore_missing=True
1222
present_keys = self.from_repository._find_present_inventory_keys(
1223
excluded_revision_keys)
1224
present_ids = [k[-1] for k in present_keys]
1225
uninteresting_root_keys = set()
1226
uninteresting_pid_root_keys = set()
1227
for inv in self.from_repository.iter_inventories(present_ids):
1228
uninteresting_root_keys.add(inv.id_to_entry.key())
1229
uninteresting_pid_root_keys.add(
1230
inv.parent_id_basename_to_file_id.key())
1231
chk_bytes = self.from_repository.chk_bytes
1233
def _filter_id_to_entry():
1234
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1235
self._chk_id_roots, uninteresting_root_keys)
1236
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1237
chk_map._bytes_to_text_key):
1238
if record is not None:
1241
self._chk_id_roots = None
1242
yield 'chk_bytes', _filter_id_to_entry()
1244
def _get_parent_id_basename_to_file_id_pages():
1245
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1246
self._chk_p_id_roots, uninteresting_pid_root_keys):
1247
if record is not None:
1250
self._chk_p_id_roots = None
1251
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1253
def _get_text_stream(self):
1254
# Note: We know we don't have to handle adding root keys, because both
1255
# the source and target are the identical network name.
1256
text_stream = self.from_repository.texts.get_record_stream(
1257
self._text_keys, self._text_fetch_order, False)
1258
return ('texts', text_stream)
1260
def get_stream(self, search):
1261
def wrap_and_count(pb, rc, stream):
1262
"""Yield records from stream while showing progress."""
1264
for record in stream:
1265
if count == rc.STEP:
1267
pb.update('Estimate', rc.current, rc.max)
1272
revision_ids = search.get_keys()
1273
pb = ui.ui_factory.nested_progress_bar()
1274
rc = self._record_counter
1275
self._record_counter.setup(len(revision_ids))
1276
for stream_info in self._fetch_revision_texts(revision_ids):
1277
yield (stream_info[0],
1278
wrap_and_count(pb, rc, stream_info[1]))
1279
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1280
# TODO: The keys to exclude might be part of the search recipe
1281
# For now, exclude all parents that are at the edge of ancestry, for
1282
# which we have inventories
1283
from_repo = self.from_repository
1284
parent_keys = from_repo._find_parent_keys_of_revisions(
1285
self._revision_keys)
1286
self.from_repository.revisions.clear_cache()
1287
self.from_repository.signatures.clear_cache()
1288
# Clear the repo's get_parent_map cache too.
1289
self.from_repository._unstacked_provider.disable_cache()
1290
self.from_repository._unstacked_provider.enable_cache()
1291
s = self._get_inventory_stream(self._revision_keys)
1292
yield (s[0], wrap_and_count(pb, rc, s[1]))
1293
self.from_repository.inventories.clear_cache()
1294
for stream_info in self._get_filtered_chk_streams(parent_keys):
1295
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1296
self.from_repository.chk_bytes.clear_cache()
1297
s = self._get_text_stream()
1298
yield (s[0], wrap_and_count(pb, rc, s[1]))
1299
self.from_repository.texts.clear_cache()
1300
pb.update('Done', rc.max, rc.max)
1303
def get_stream_for_missing_keys(self, missing_keys):
1304
# missing keys can only occur when we are byte copying and not
1305
# translating (because translation means we don't send
1306
# unreconstructable deltas ever).
1307
missing_inventory_keys = set()
1308
for key in missing_keys:
1309
if key[0] != 'inventories':
1310
raise AssertionError('The only missing keys we should'
1311
' be filling in are inventory keys, not %s'
1313
missing_inventory_keys.add(key[1:])
1314
if self._chk_id_roots or self._chk_p_id_roots:
1315
raise AssertionError('Cannot call get_stream_for_missing_keys'
1316
' until all of get_stream() has been consumed.')
1317
# Yield the inventory stream, so we can find the chk stream
1318
# Some of the missing_keys will be missing because they are ghosts.
1319
# As such, we can ignore them. The Sink is required to verify there are
1320
# no unavailable texts when the ghost inventories are not filled in.
1321
yield self._get_inventory_stream(missing_inventory_keys,
1323
# We use the empty set for excluded_revision_keys, to make it clear
1324
# that we want to transmit all referenced chk pages.
1325
for stream_info in self._get_filtered_chk_streams(set()):
1329
class _InterestingKeyInfo(object):
1331
self.interesting_root_keys = set()
1332
self.interesting_pid_root_keys = set()
1333
self.uninteresting_root_keys = set()
1334
self.uninteresting_pid_root_keys = set()
1336
def all_interesting(self):
1337
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1339
def all_uninteresting(self):
1340
return self.uninteresting_root_keys.union(
1341
self.uninteresting_pid_root_keys)
1344
return self.all_interesting().union(self.all_uninteresting())
1347
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1348
result = _InterestingKeyInfo()
1349
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1350
root_key = inv.id_to_entry.key()
1351
pid_root_key = inv.parent_id_basename_to_file_id.key()
1352
if inv.revision_id in parent_only_inv_ids:
1353
result.uninteresting_root_keys.add(root_key)
1354
result.uninteresting_pid_root_keys.add(pid_root_key)
1356
result.interesting_root_keys.add(root_key)
1357
result.interesting_pid_root_keys.add(pid_root_key)
1361
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1362
"""Iterate the result of iter_interesting_nodes, yielding the records
1363
and adding to text_keys.
1365
text_keys_update = text_keys.update
1366
for record, items in interesting_nodes_iterable:
1367
text_keys_update([bytes_to_text_key(b) for n, b in items])
1371
class RepositoryFormat2a(RepositoryFormatPack):
1372
"""A CHK repository that uses the bencode revision serializer."""
1374
repository_class = CHKInventoryRepository
1375
supports_external_lookups = True
1376
supports_chks = True
1377
_commit_builder_class = PackCommitBuilder
1378
rich_root_data = True
1379
_serializer = chk_serializer.chk_bencode_serializer
1380
_commit_inv_deltas = True
1381
# What index classes to use
1382
index_builder_class = BTreeBuilder
1383
index_class = BTreeGraphIndex
1384
# Note: We cannot unpack a delta that references a text we haven't
1385
# seen yet. There are 2 options, work in fulltexts, or require
1386
# topological sorting. Using fulltexts is more optimal for local
1387
# operations, because the source can be smart about extracting
1388
# multiple in-a-row (and sharing strings). Topological is better
1389
# for remote, because we access less data.
1390
_fetch_order = 'unordered'
1391
# essentially ignored by the groupcompress code.
1392
_fetch_uses_deltas = False
1394
pack_compresses = True
1396
def _get_matching_bzrdir(self):
1397
return controldir.format_registry.make_controldir('2a')
1399
def _ignore_setting_bzrdir(self, format):
1402
_matchingcontroldir = property(
1403
_get_matching_bzrdir, _ignore_setting_bzrdir)
1406
def get_format_string(cls):
1407
return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
1409
def get_format_description(self):
1410
"""See RepositoryFormat.get_format_description()."""
1411
return ("Repository format 2a - rich roots, group compression"
1412
" and chk inventories")
1415
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1416
"""A 2a repository format that supports nested trees.
1420
def _get_matching_bzrdir(self):
1421
return controldir.format_registry.make_controldir('development-subtree')
1423
def _ignore_setting_bzrdir(self, format):
1426
_matchingcontroldir = property(
1427
_get_matching_bzrdir, _ignore_setting_bzrdir)
1430
def get_format_string(cls):
1431
return b'Bazaar development format 8\n'
1433
def get_format_description(self):
1434
"""See RepositoryFormat.get_format_description()."""
1435
return ("Development repository format 8 - nested trees, "
1436
"group compression and chk inventories")
1439
supports_tree_reference = True