1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
26
revision as _mod_revision,
38
from ..bzr.btree_index import (
42
from ..bzr.groupcompress import (
44
GroupCompressVersionedFiles,
46
from .pack_repo import (
52
RepositoryPackCollection,
57
from ..bzr.vf_repository import (
60
from ..static_tuple import StaticTuple
63
class GCPack(NewPack):
65
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
66
"""Create a NewPack instance.
68
:param pack_collection: A PackCollection into which this is being
70
:param upload_suffix: An optional suffix to be given to any temporary
71
files created during the pack creation. e.g '.autopack'
72
:param file_mode: An optional file mode to create the new files with.
74
# replaced from NewPack to:
75
# - change inventory reference list length to 1
76
# - change texts reference lists to 1
77
# TODO: patch this to be parameterised
79
# The relative locations of the packs are constrained, but all are
80
# passed in because the caller has them, so as to avoid object churn.
81
index_builder_class = pack_collection._index_builder_class
83
if pack_collection.chk_index is not None:
84
chk_index = index_builder_class(reference_lists=0)
88
# Revisions: parents list, no text compression.
89
index_builder_class(reference_lists=1),
90
# Inventory: We want to map compression only, but currently the
91
# knit code hasn't been updated enough to understand that, so we
92
# have a regular 2-list index giving parents and compression
94
index_builder_class(reference_lists=1),
95
# Texts: per file graph, for all fileids - so one reference list
96
# and two elements in the key tuple.
97
index_builder_class(reference_lists=1, key_elements=2),
98
# Signatures: Just blobs to store, no compression, no parents
100
index_builder_class(reference_lists=0),
101
# CHK based storage - just blobs, no compression or parents.
104
self._pack_collection = pack_collection
105
# When we make readonly indices, we need this.
106
self.index_class = pack_collection._index_class
107
# where should the new pack be opened
108
self.upload_transport = pack_collection._upload_transport
109
# where are indices written out to
110
self.index_transport = pack_collection._index_transport
111
# where is the pack renamed to when it is finished?
112
self.pack_transport = pack_collection._pack_transport
113
# What file mode to upload the pack and indices with.
114
self._file_mode = file_mode
115
# tracks the content written to the .pack file.
116
self._hash = osutils.md5()
117
# a four-tuple with the length in bytes of the indices, once the pack
118
# is finalised. (rev, inv, text, sigs)
119
self.index_sizes = None
120
# How much data to cache when writing packs. Note that this is not
121
# synchronised with reads, because it's not in the transport layer, so
122
# is not safe unless the client knows it won't be reading from the pack
124
self._cache_limit = 0
125
# the temporary pack file name.
126
self.random_name = osutils.rand_chars(20) + upload_suffix
127
# when was this pack started ?
128
self.start_time = time.time()
129
# open an output stream for the data added to the pack.
130
self.write_stream = self.upload_transport.open_write_stream(
131
self.random_name, mode=self._file_mode)
132
if 'pack' in debug.debug_flags:
133
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
134
time.ctime(), self.upload_transport.base, self.random_name,
135
time.time() - self.start_time)
136
# A list of byte sequences to be written to the new pack, and the
137
# aggregate size of them. Stored as a list rather than separate
138
# variables so that the _write_data closure below can update them.
139
self._buffer = [[], 0]
140
# create a callable for adding data
142
# robertc says- this is a closure rather than a method on the object
143
# so that the variables are locals, and faster than accessing object
146
def _write_data(data, flush=False, _buffer=self._buffer,
147
_write=self.write_stream.write, _update=self._hash.update):
148
_buffer[0].append(data)
149
_buffer[1] += len(data)
151
if _buffer[1] > self._cache_limit or flush:
152
data = b''.join(_buffer[0])
156
# expose this on self, for the occasion when clients want to add data.
157
self._write_data = _write_data
158
# a pack writer object to serialise pack records.
159
self._writer = pack.ContainerWriter(self._write_data)
161
# what state is the pack in? (open, finished, aborted)
163
# no name until we finish writing the content
166
def _check_references(self):
167
"""Make sure our external references are present.
169
Packs are allowed to have deltas whose base is not in the pack, but it
170
must be present somewhere in this collection. It is not allowed to
171
have deltas based on a fallback repository.
172
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
174
# Groupcompress packs don't have any external references, arguably CHK
175
# pages have external references, but we cannot 'cheaply' determine
176
# them without actually walking all of the chk pages.
179
class ResumedGCPack(ResumedPack):
181
def _check_references(self):
182
"""Make sure our external compression parents are present."""
183
# See GCPack._check_references for why this is empty
185
def _get_external_refs(self, index):
186
# GC repositories don't have compression parents external to a given
191
class GCCHKPacker(Packer):
192
"""This class understand what it takes to collect a GCCHK repo."""
194
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
196
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
197
revision_ids=revision_ids,
198
reload_func=reload_func)
199
self._pack_collection = pack_collection
200
# ATM, We only support this for GCCHK repositories
201
if pack_collection.chk_index is None:
202
raise AssertionError(
203
'pack_collection.chk_index should not be None')
204
self._gather_text_refs = False
205
self._chk_id_roots = []
206
self._chk_p_id_roots = []
207
self._text_refs = None
208
# set by .pack() if self.revision_ids is not None
209
self.revision_keys = None
211
def _get_progress_stream(self, source_vf, keys, message, pb):
213
substream = source_vf.get_record_stream(
214
keys, 'groupcompress', True)
215
for idx, record in enumerate(substream):
217
pb.update(message, idx + 1, len(keys))
221
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
222
"""Filter the texts of inventories, to find the chk pages."""
223
total_keys = len(keys)
225
def _filtered_inv_stream():
227
p_id_roots_set = set()
228
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
229
for idx, record in enumerate(stream):
230
# Inventories should always be with revisions; assume success.
231
lines = record.get_bytes_as('lines')
232
chk_inv = inventory.CHKInventory.deserialise(
233
None, lines, record.key)
235
pb.update('inv', idx, total_keys)
236
key = chk_inv.id_to_entry.key()
237
if key not in id_roots_set:
238
self._chk_id_roots.append(key)
239
id_roots_set.add(key)
240
p_id_map = chk_inv.parent_id_basename_to_file_id
242
raise AssertionError('Parent id -> file_id map not set')
244
if key not in p_id_roots_set:
245
p_id_roots_set.add(key)
246
self._chk_p_id_roots.append(key)
248
# We have finished processing all of the inventory records, we
249
# don't need these sets anymore
251
p_id_roots_set.clear()
252
return _filtered_inv_stream()
254
def _get_chk_streams(self, source_vf, keys, pb=None):
255
# We want to stream the keys from 'id_roots', and things they
256
# reference, and then stream things from p_id_roots and things they
257
# reference, and then any remaining keys that we didn't get to.
259
# We also group referenced texts together, so if one root references a
260
# text with prefix 'a', and another root references a node with prefix
261
# 'a', we want to yield those nodes before we yield the nodes for 'b'
262
# This keeps 'similar' nodes together.
264
# Note: We probably actually want multiple streams here, to help the
265
# client understand that the different levels won't compress well
266
# against each other.
267
# Test the difference between using one Group per level, and
268
# using 1 Group per prefix. (so '' (root) would get a group, then
269
# all the references to search-key 'a' would get a group, etc.)
270
total_keys = len(keys)
271
remaining_keys = set(keys)
273
if self._gather_text_refs:
274
self._text_refs = set()
276
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
279
keys_by_search_prefix = {}
280
remaining_keys.difference_update(cur_keys)
283
def handle_internal_node(node):
284
for prefix, value in node._items.items():
285
# We don't want to request the same key twice, and we
286
# want to order it by the first time it is seen.
287
# Even further, we don't want to request a key which is
288
# not in this group of pack files (it should be in the
289
# repo, but it doesn't have to be in the group being
291
# TODO: consider how to treat externally referenced chk
292
# pages as 'external_references' so that we
293
# always fill them in for stacked branches
294
if value not in next_keys and value in remaining_keys:
295
keys_by_search_prefix.setdefault(prefix,
299
def handle_leaf_node(node):
300
# Store is None, because we know we have a LeafNode, and we
301
# just want its entries
302
for file_id, bytes in node.iteritems(None):
303
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
306
stream = source_vf.get_record_stream(cur_keys,
307
'as-requested', True)
308
for record in stream:
309
if record.storage_kind == 'absent':
310
# An absent CHK record: we assume that the missing
311
# record is in a different pack - e.g. a page not
312
# altered by the commit we're packing.
314
bytes = record.get_bytes_as('fulltext')
315
# We don't care about search_key_func for this code,
316
# because we only care about external references.
317
node = chk_map._deserialise(bytes, record.key,
318
search_key_func=None)
319
common_base = node._search_prefix
320
if isinstance(node, chk_map.InternalNode):
321
handle_internal_node(node)
322
elif parse_leaf_nodes:
323
handle_leaf_node(node)
326
pb.update('chk node', counter[0], total_keys)
329
# Double check that we won't be emitting any keys twice
330
# If we get rid of the pre-calculation of all keys, we could
331
# turn this around and do
332
# next_keys.difference_update(seen_keys)
333
# However, we also may have references to chk pages in another
334
# pack file during autopack. We filter earlier, so we should no
335
# longer need to do this
336
# next_keys = next_keys.intersection(remaining_keys)
338
for prefix in sorted(keys_by_search_prefix):
339
cur_keys.extend(keys_by_search_prefix.pop(prefix))
340
for stream in _get_referenced_stream(self._chk_id_roots,
341
self._gather_text_refs):
343
del self._chk_id_roots
344
# while it isn't really possible for chk_id_roots to not be in the
345
# local group of packs, it is possible that the tree shape has not
346
# changed recently, so we need to filter _chk_p_id_roots by the
348
chk_p_id_roots = [key for key in self._chk_p_id_roots
349
if key in remaining_keys]
350
del self._chk_p_id_roots
351
for stream in _get_referenced_stream(chk_p_id_roots, False):
354
trace.mutter('There were %d keys in the chk index, %d of which'
355
' were not referenced', total_keys,
357
if self.revision_ids is None:
358
stream = source_vf.get_record_stream(remaining_keys,
362
def _build_vf(self, index_name, parents, delta, for_write=False):
363
"""Build a VersionedFiles instance on top of this group of packs."""
364
index_name = index_name + '_index'
366
access = _DirectPackAccess(index_to_pack,
367
reload_func=self._reload_func)
370
if self.new_pack is None:
371
raise AssertionError('No new pack has been set')
372
index = getattr(self.new_pack, index_name)
373
index_to_pack[index] = self.new_pack.access_tuple()
374
index.set_optimize(for_size=True)
375
access.set_writer(self.new_pack._writer, index,
376
self.new_pack.access_tuple())
377
add_callback = index.add_nodes
380
for pack in self.packs:
381
sub_index = getattr(pack, index_name)
382
index_to_pack[sub_index] = pack.access_tuple()
383
indices.append(sub_index)
384
index = _mod_index.CombinedGraphIndex(indices)
386
vf = GroupCompressVersionedFiles(
388
add_callback=add_callback,
390
is_locked=self._pack_collection.repo.is_locked),
395
def _build_vfs(self, index_name, parents, delta):
396
"""Build the source and target VersionedFiles."""
397
source_vf = self._build_vf(index_name, parents,
398
delta, for_write=False)
399
target_vf = self._build_vf(index_name, parents,
400
delta, for_write=True)
401
return source_vf, target_vf
403
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
405
trace.mutter('repacking %d %s', len(keys), message)
406
self.pb.update('repacking %s' % (message,), pb_offset)
407
with ui.ui_factory.nested_progress_bar() as child_pb:
408
stream = vf_to_stream(source_vf, keys, message, child_pb)
409
for _, _ in target_vf._insert_record_stream(
410
stream, random_id=True, reuse_blocks=False):
413
def _copy_revision_texts(self):
414
source_vf, target_vf = self._build_vfs('revision', True, False)
415
if not self.revision_keys:
416
# We are doing a full fetch, aka 'pack'
417
self.revision_keys = source_vf.keys()
418
self._copy_stream(source_vf, target_vf, self.revision_keys,
419
'revisions', self._get_progress_stream, 1)
421
def _copy_inventory_texts(self):
422
source_vf, target_vf = self._build_vfs('inventory', True, True)
423
# It is not sufficient to just use self.revision_keys, as stacked
424
# repositories can have more inventories than they have revisions.
425
# One alternative would be to do something with
426
# get_parent_map(self.revision_keys), but that shouldn't be any faster
428
inventory_keys = source_vf.keys()
429
missing_inventories = set(
430
self.revision_keys).difference(inventory_keys)
431
if missing_inventories:
432
# Go back to the original repo, to see if these are really missing
433
# https://bugs.launchpad.net/bzr/+bug/437003
434
# If we are packing a subset of the repo, it is fine to just have
435
# the data in another Pack file, which is not included in this pack
437
inv_index = self._pack_collection.repo.inventories._index
438
pmap = inv_index.get_parent_map(missing_inventories)
439
really_missing = missing_inventories.difference(pmap)
441
missing_inventories = sorted(really_missing)
442
raise ValueError('We are missing inventories for revisions: %s'
443
% (missing_inventories,))
444
self._copy_stream(source_vf, target_vf, inventory_keys,
445
'inventories', self._get_filtered_inv_stream, 2)
447
def _get_chk_vfs_for_copy(self):
448
return self._build_vfs('chk', False, False)
450
def _copy_chk_texts(self):
451
source_vf, target_vf = self._get_chk_vfs_for_copy()
452
# TODO: This is technically spurious... if it is a performance issue,
454
total_keys = source_vf.keys()
455
trace.mutter('repacking chk: %d id_to_entry roots,'
456
' %d p_id_map roots, %d total keys',
457
len(self._chk_id_roots), len(self._chk_p_id_roots),
459
self.pb.update('repacking chk', 3)
460
with ui.ui_factory.nested_progress_bar() as child_pb:
461
for stream in self._get_chk_streams(source_vf, total_keys,
463
for _, _ in target_vf._insert_record_stream(
464
stream, random_id=True, reuse_blocks=False):
467
def _copy_text_texts(self):
468
source_vf, target_vf = self._build_vfs('text', True, True)
469
# XXX: We don't walk the chk map to determine referenced (file_id,
470
# revision_id) keys. We don't do it yet because you really need
471
# to filter out the ones that are present in the parents of the
472
# rev just before the ones you are copying, otherwise the filter
473
# is grabbing too many keys...
474
text_keys = source_vf.keys()
475
self._copy_stream(source_vf, target_vf, text_keys,
476
'texts', self._get_progress_stream, 4)
478
def _copy_signature_texts(self):
479
source_vf, target_vf = self._build_vfs('signature', False, False)
480
signature_keys = source_vf.keys()
481
signature_keys.intersection(self.revision_keys)
482
self._copy_stream(source_vf, target_vf, signature_keys,
483
'signatures', self._get_progress_stream, 5)
485
def _create_pack_from_packs(self):
486
self.pb.update('repacking', 0, 7)
487
self.new_pack = self.open_pack()
488
# Is this necessary for GC ?
489
self.new_pack.set_write_cache_size(1024 * 1024)
490
self._copy_revision_texts()
491
self._copy_inventory_texts()
492
self._copy_chk_texts()
493
self._copy_text_texts()
494
self._copy_signature_texts()
495
self.new_pack._check_references()
496
if not self._use_pack(self.new_pack):
497
self.new_pack.abort()
499
self.new_pack.finish_content()
500
if len(self.packs) == 1:
501
old_pack = self.packs[0]
502
if old_pack.name == self.new_pack._hash.hexdigest():
503
# The single old pack was already optimally packed.
504
trace.mutter('single pack %s was already optimally packed',
506
self.new_pack.abort()
508
self.pb.update('finishing repack', 6, 7)
509
self.new_pack.finish()
510
self._pack_collection.allocate(self.new_pack)
514
class GCCHKReconcilePacker(GCCHKPacker):
515
"""A packer which regenerates indices etc as it copies.
517
This is used by ``brz reconcile`` to cause parent text pointers to be
521
def __init__(self, *args, **kwargs):
522
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
523
self._data_changed = False
524
self._gather_text_refs = True
526
def _copy_inventory_texts(self):
527
source_vf, target_vf = self._build_vfs('inventory', True, True)
528
self._copy_stream(source_vf, target_vf, self.revision_keys,
529
'inventories', self._get_filtered_inv_stream, 2)
530
if source_vf.keys() != self.revision_keys:
531
self._data_changed = True
533
def _copy_text_texts(self):
534
"""generate what texts we should have and then copy."""
535
source_vf, target_vf = self._build_vfs('text', True, True)
536
trace.mutter('repacking %d texts', len(self._text_refs))
537
self.pb.update("repacking texts", 4)
538
# we have three major tasks here:
539
# 1) generate the ideal index
540
repo = self._pack_collection.repo
541
# We want the one we just wrote, so base it on self.new_pack
542
revision_vf = self._build_vf('revision', True, False, for_write=True)
543
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
544
# Strip keys back into revision_ids.
545
ancestors = dict((k[0], tuple([p[0] for p in parents]))
546
for k, parents in ancestor_keys.items())
548
# TODO: _generate_text_key_index should be much cheaper to generate from
549
# a chk repository, rather than the current implementation
550
ideal_index = repo._generate_text_key_index(None, ancestors)
551
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
552
# 2) generate a keys list that contains all the entries that can
553
# be used as-is, with corrected parents.
555
new_parent_keys = {} # (key, parent_keys)
557
NULL_REVISION = _mod_revision.NULL_REVISION
558
for key in self._text_refs:
564
ideal_parents = tuple(ideal_index[key])
566
discarded_keys.append(key)
567
self._data_changed = True
569
if ideal_parents == (NULL_REVISION,):
571
source_parents = file_id_parent_map[key]
572
if ideal_parents == source_parents:
576
# We need to change the parent graph, but we don't need to
577
# re-insert the text (since we don't pun the compression
578
# parent with the parents list)
579
self._data_changed = True
580
new_parent_keys[key] = ideal_parents
581
# we're finished with some data.
583
del file_id_parent_map
584
# 3) bulk copy the data, updating records than need it
586
def _update_parents_for_texts():
587
stream = source_vf.get_record_stream(self._text_refs,
588
'groupcompress', False)
589
for record in stream:
590
if record.key in new_parent_keys:
591
record.parents = new_parent_keys[record.key]
593
target_vf.insert_record_stream(_update_parents_for_texts())
595
def _use_pack(self, new_pack):
596
"""Override _use_pack to check for reconcile having changed content."""
597
return new_pack.data_inserted() and self._data_changed
600
class GCCHKCanonicalizingPacker(GCCHKPacker):
601
"""A packer that ensures inventories have canonical-form CHK maps.
603
Ideally this would be part of reconcile, but it's very slow and rarely
604
needed. (It repairs repositories affected by
605
https://bugs.launchpad.net/bzr/+bug/522637).
608
def __init__(self, *args, **kwargs):
609
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
610
self._data_changed = False
612
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
613
"""Create and exhaust a stream, but don't insert it.
615
This is useful to get the side-effects of generating a stream.
617
self.pb.update('scanning %s' % (message,), pb_offset)
618
with ui.ui_factory.nested_progress_bar() as child_pb:
619
list(vf_to_stream(source_vf, keys, message, child_pb))
621
def _copy_inventory_texts(self):
622
source_vf, target_vf = self._build_vfs('inventory', True, True)
623
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
624
inventory_keys = source_vf.keys()
625
# First, copy the existing CHKs on the assumption that most of them
626
# will be correct. This will save us from having to reinsert (and
627
# recompress) these records later at the cost of perhaps preserving a
629
# (Iterate but don't insert _get_filtered_inv_stream to populate the
630
# variables needed by GCCHKPacker._copy_chk_texts.)
631
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
632
self._get_filtered_inv_stream, 2)
633
GCCHKPacker._copy_chk_texts(self)
634
# Now copy and fix the inventories, and any regenerated CHKs.
636
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
637
return self._get_filtered_canonicalizing_inv_stream(
638
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
639
self._copy_stream(source_vf, target_vf, inventory_keys,
640
'inventories', chk_canonicalizing_inv_stream, 4)
642
def _copy_chk_texts(self):
643
# No-op; in this class this happens during _copy_inventory_texts.
646
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
647
pb=None, source_chk_vf=None, target_chk_vf=None):
648
"""Filter the texts of inventories, regenerating CHKs to make sure they
651
total_keys = len(keys)
652
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
654
def _filtered_inv_stream():
655
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
656
search_key_name = None
657
for idx, record in enumerate(stream):
658
# Inventories should always be with revisions; assume success.
659
lines = record.get_bytes_as('lines')
660
chk_inv = inventory.CHKInventory.deserialise(
661
source_chk_vf, lines, record.key)
663
pb.update('inv', idx, total_keys)
664
chk_inv.id_to_entry._ensure_root()
665
if search_key_name is None:
666
# Find the name corresponding to the search_key_func
667
search_key_reg = chk_map.search_key_registry
668
for search_key_name, func in search_key_reg.items():
669
if func == chk_inv.id_to_entry._search_key_func:
671
canonical_inv = inventory.CHKInventory.from_inventory(
672
target_chk_vf, chk_inv,
673
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
674
search_key_name=search_key_name)
675
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
677
'Non-canonical CHK map for id_to_entry of inv: %s '
678
'(root is %s, should be %s)' % (chk_inv.revision_id,
679
chk_inv.id_to_entry.key()[
681
canonical_inv.id_to_entry.key()[0]))
682
self._data_changed = True
683
p_id_map = chk_inv.parent_id_basename_to_file_id
684
p_id_map._ensure_root()
685
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
686
if p_id_map.key() != canon_p_id_map.key():
688
'Non-canonical CHK map for parent_id_to_basename of '
689
'inv: %s (root is %s, should be %s)'
690
% (chk_inv.revision_id, p_id_map.key()[0],
691
canon_p_id_map.key()[0]))
692
self._data_changed = True
693
yield versionedfile.ChunkedContentFactory(
694
record.key, record.parents, record.sha1, canonical_inv.to_lines(),
695
chunks_are_lines=True)
696
# We have finished processing all of the inventory records, we
697
# don't need these sets anymore
698
return _filtered_inv_stream()
700
def _use_pack(self, new_pack):
701
"""Override _use_pack to check for reconcile having changed content."""
702
return new_pack.data_inserted() and self._data_changed
705
class GCRepositoryPackCollection(RepositoryPackCollection):
707
pack_factory = GCPack
708
resumed_pack_factory = ResumedGCPack
709
normal_packer_class = GCCHKPacker
710
optimising_packer_class = GCCHKPacker
712
def _check_new_inventories(self):
713
"""Detect missing inventories or chk root entries for the new revisions
716
:returns: list of strs, summarising any problems found. If the list is
717
empty no problems were found.
719
# Ensure that all revisions added in this write group have:
720
# - corresponding inventories,
721
# - chk root entries for those inventories,
722
# - and any present parent inventories have their chk root
724
# And all this should be independent of any fallback repository.
726
key_deps = self.repo.revisions._index._key_dependencies
727
new_revisions_keys = key_deps.get_new_keys()
728
no_fallback_inv_index = self.repo.inventories._index
729
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
730
no_fallback_texts_index = self.repo.texts._index
731
inv_parent_map = no_fallback_inv_index.get_parent_map(
733
# Are any inventories for corresponding to the new revisions missing?
734
corresponding_invs = set(inv_parent_map)
735
missing_corresponding = set(new_revisions_keys)
736
missing_corresponding.difference_update(corresponding_invs)
737
if missing_corresponding:
738
problems.append("inventories missing for revisions %s" %
739
(sorted(missing_corresponding),))
741
# Are any chk root entries missing for any inventories? This includes
742
# any present parent inventories, which may be used when calculating
743
# deltas for streaming.
744
all_inv_keys = set(corresponding_invs)
745
for parent_inv_keys in inv_parent_map.values():
746
all_inv_keys.update(parent_inv_keys)
747
# Filter out ghost parents.
748
all_inv_keys.intersection_update(
749
no_fallback_inv_index.get_parent_map(all_inv_keys))
750
parent_invs_only_keys = all_inv_keys.symmetric_difference(
753
inv_ids = [key[-1] for key in all_inv_keys]
754
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
755
root_key_info = _build_interesting_key_sets(
756
self.repo, inv_ids, parent_invs_only_ids)
757
expected_chk_roots = root_key_info.all_keys()
758
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
760
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
761
if missing_chk_roots:
763
"missing referenced chk root keys: %s."
764
"Run 'brz reconcile --canonicalize-chks' on the affected "
766
% (sorted(missing_chk_roots),))
767
# Don't bother checking any further.
769
# Find all interesting chk_bytes records, and make sure they are
770
# present, as well as the text keys they reference.
771
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
772
chk_bytes_no_fallbacks._search_key_func = \
773
self.repo.chk_bytes._search_key_func
774
chk_diff = chk_map.iter_interesting_nodes(
775
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
776
root_key_info.uninteresting_root_keys)
779
for record in _filter_text_keys(chk_diff, text_keys,
780
chk_map._bytes_to_text_key):
782
except errors.NoSuchRevision as e:
783
# XXX: It would be nice if we could give a more precise error here.
784
problems.append("missing chk node(s) for id_to_entry maps")
785
chk_diff = chk_map.iter_interesting_nodes(
786
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
787
root_key_info.uninteresting_pid_root_keys)
789
for interesting_rec, interesting_map in chk_diff:
791
except errors.NoSuchRevision as e:
793
"missing chk node(s) for parent_id_basename_to_file_id maps")
794
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
795
missing_text_keys = text_keys.difference(present_text_keys)
796
if missing_text_keys:
797
problems.append("missing text keys: %r"
798
% (sorted(missing_text_keys),))
802
class CHKInventoryRepository(PackRepository):
803
"""subclass of PackRepository that uses CHK based inventories."""
805
def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
807
"""Overridden to change pack collection class."""
808
super(CHKInventoryRepository, self).__init__(_format, a_controldir,
809
control_files, _commit_builder_class, _serializer)
810
index_transport = self._transport.clone('indices')
811
self._pack_collection = GCRepositoryPackCollection(self,
812
self._transport, index_transport,
813
self._transport.clone(
815
self._transport.clone(
817
_format.index_builder_class,
819
use_chk_index=self._format.supports_chks,
821
self.inventories = GroupCompressVersionedFiles(
822
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
823
add_callback=self._pack_collection.inventory_index.add_callback,
824
parents=True, is_locked=self.is_locked,
825
inconsistency_fatal=False),
826
access=self._pack_collection.inventory_index.data_access)
827
self.revisions = GroupCompressVersionedFiles(
828
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
829
add_callback=self._pack_collection.revision_index.add_callback,
830
parents=True, is_locked=self.is_locked,
831
track_external_parent_refs=True, track_new_keys=True),
832
access=self._pack_collection.revision_index.data_access,
834
self.signatures = GroupCompressVersionedFiles(
835
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
836
add_callback=self._pack_collection.signature_index.add_callback,
837
parents=False, is_locked=self.is_locked,
838
inconsistency_fatal=False),
839
access=self._pack_collection.signature_index.data_access,
841
self.texts = GroupCompressVersionedFiles(
842
_GCGraphIndex(self._pack_collection.text_index.combined_index,
843
add_callback=self._pack_collection.text_index.add_callback,
844
parents=True, is_locked=self.is_locked,
845
inconsistency_fatal=False),
846
access=self._pack_collection.text_index.data_access)
847
# No parents, individual CHK pages don't have specific ancestry
848
self.chk_bytes = GroupCompressVersionedFiles(
849
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
850
add_callback=self._pack_collection.chk_index.add_callback,
851
parents=False, is_locked=self.is_locked,
852
inconsistency_fatal=False),
853
access=self._pack_collection.chk_index.data_access)
854
search_key_name = self._format._serializer.search_key_name
855
search_key_func = chk_map.search_key_registry.get(search_key_name)
856
self.chk_bytes._search_key_func = search_key_func
857
# True when the repository object is 'write locked' (as opposed to the
858
# physical lock only taken out around changes to the pack-names list.)
859
# Another way to represent this would be a decorator around the control
860
# files object that presents logical locks as physical ones - if this
861
# gets ugly consider that alternative design. RBC 20071011
862
self._write_lock_count = 0
863
self._transaction = None
865
self._reconcile_does_inventory_gc = True
866
self._reconcile_fixes_text_parents = True
867
self._reconcile_backsup_inventory = False
869
def _add_inventory_checked(self, revision_id, inv, parents):
870
"""Add inv to the repository after checking the inputs.
872
This function can be overridden to allow different inventory styles.
874
:seealso: add_inventory, for the contract.
877
serializer = self._format._serializer
878
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
879
maximum_size=serializer.maximum_size,
880
search_key_name=serializer.search_key_name)
881
inv_lines = result.to_lines()
882
return self._inventory_add_lines(revision_id, parents,
883
inv_lines, check_content=False)
885
def _create_inv_from_null(self, delta, revision_id):
886
"""This will mutate new_inv directly.
888
This is a simplified form of create_by_apply_delta which knows that all
889
the old values must be None, so everything is a create.
891
serializer = self._format._serializer
892
new_inv = inventory.CHKInventory(serializer.search_key_name)
893
new_inv.revision_id = revision_id
894
entry_to_bytes = new_inv._entry_to_bytes
895
id_to_entry_dict = {}
896
parent_id_basename_dict = {}
897
for old_path, new_path, file_id, entry in delta:
898
if old_path is not None:
899
raise ValueError('Invalid delta, somebody tried to delete %r'
900
' from the NULL_REVISION'
901
% ((old_path, file_id),))
903
raise ValueError('Invalid delta, delta from NULL_REVISION has'
904
' no new_path %r' % (file_id,))
906
new_inv.root_id = file_id
907
parent_id_basename_key = StaticTuple(b'', b'').intern()
909
utf8_entry_name = entry.name.encode('utf-8')
910
parent_id_basename_key = StaticTuple(entry.parent_id,
911
utf8_entry_name).intern()
912
new_value = entry_to_bytes(entry)
914
# new_inv._path_to_fileid_cache[new_path] = file_id
915
key = StaticTuple(file_id).intern()
916
id_to_entry_dict[key] = new_value
917
parent_id_basename_dict[parent_id_basename_key] = file_id
919
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
920
parent_id_basename_dict, maximum_size=serializer.maximum_size)
923
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
924
parents, basis_inv=None, propagate_caches=False):
925
"""Add a new inventory expressed as a delta against another revision.
927
:param basis_revision_id: The inventory id the delta was created
929
:param delta: The inventory delta (see Inventory.apply_delta for
931
:param new_revision_id: The revision id that the inventory is being
933
:param parents: The revision ids of the parents that revision_id is
934
known to have and are in the repository already. These are supplied
935
for repositories that depend on the inventory graph for revision
936
graph access, as well as for those that pun ancestry with delta
938
:param basis_inv: The basis inventory if it is already known,
940
:param propagate_caches: If True, the caches for this inventory are
941
copied to and updated for the result if possible.
943
:returns: (validator, new_inv)
944
The validator(which is a sha1 digest, though what is sha'd is
945
repository format specific) of the serialized inventory, and the
948
if not self.is_in_write_group():
949
raise AssertionError("%r not in write group" % (self,))
950
_mod_revision.check_not_reserved_id(new_revision_id)
952
if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
953
if basis_revision_id == _mod_revision.NULL_REVISION:
954
new_inv = self._create_inv_from_null(delta, new_revision_id)
955
if new_inv.root_id is None:
956
raise errors.RootMissing()
957
inv_lines = new_inv.to_lines()
958
return self._inventory_add_lines(new_revision_id, parents,
959
inv_lines, check_content=False), new_inv
961
basis_tree = self.revision_tree(basis_revision_id)
962
basis_tree.lock_read()
963
basis_inv = basis_tree.root_inventory
965
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
966
propagate_caches=propagate_caches)
967
inv_lines = result.to_lines()
968
return self._inventory_add_lines(new_revision_id, parents,
969
inv_lines, check_content=False), result
971
if basis_tree is not None:
974
def _deserialise_inventory(self, revision_id, lines):
975
return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
978
def _iter_inventories(self, revision_ids, ordering):
979
"""Iterate over many inventory objects."""
981
ordering = 'unordered'
982
keys = [(revision_id,) for revision_id in revision_ids]
983
stream = self.inventories.get_record_stream(keys, ordering, True)
985
for record in stream:
986
if record.storage_kind != 'absent':
987
texts[record.key] = record.get_bytes_as('lines')
989
texts[record.key] = None
993
yield (None, key[-1])
995
yield (inventory.CHKInventory.deserialise(
996
self.chk_bytes, lines, key), key[-1])
998
def _get_inventory_xml(self, revision_id):
999
"""Get serialized inventory as a string."""
1000
# Without a native 'xml' inventory, this method doesn't make sense.
1001
# However older working trees, and older bundles want it - so we supply
1002
# it allowing _get_inventory_xml to work. Bundles currently use the
1003
# serializer directly; this also isn't ideal, but there isn't an xml
1004
# iteration interface offered at all for repositories.
1005
return self._serializer.write_inventory_to_lines(
1006
self.get_inventory(revision_id))
1008
def _find_present_inventory_keys(self, revision_keys):
1009
parent_map = self.inventories.get_parent_map(revision_keys)
1010
present_inventory_keys = set(k for k in parent_map)
1011
return present_inventory_keys
1013
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1014
"""Find the file ids and versions affected by revisions.
1016
:param revisions: an iterable containing revision ids.
1017
:param _inv_weave: The inventory weave from this repository or None.
1018
If None, the inventory weave will be opened automatically.
1019
:return: a dictionary mapping altered file-ids to an iterable of
1020
revision_ids. Each altered file-ids has the exact revision_ids that
1021
altered it listed explicitly.
1023
rich_root = self.supports_rich_root()
1024
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1025
file_id_revisions = {}
1026
with ui.ui_factory.nested_progress_bar() as pb:
1027
revision_keys = [(r,) for r in revision_ids]
1028
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1029
# TODO: instead of using _find_present_inventory_keys, change the
1030
# code paths to allow missing inventories to be tolerated.
1031
# However, we only want to tolerate missing parent
1032
# inventories, not missing inventories for revision_ids
1033
present_parent_inv_keys = self._find_present_inventory_keys(
1035
present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1036
inventories_to_read = set(revision_ids)
1037
inventories_to_read.update(present_parent_inv_ids)
1038
root_key_info = _build_interesting_key_sets(
1039
self, inventories_to_read, present_parent_inv_ids)
1040
interesting_root_keys = root_key_info.interesting_root_keys
1041
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1042
chk_bytes = self.chk_bytes
1043
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1044
interesting_root_keys, uninteresting_root_keys,
1046
for name, bytes in items:
1047
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1048
# TODO: consider interning file_id, revision_id here, or
1049
# pushing that intern() into bytes_to_info()
1050
# TODO: rich_root should always be True here, for all
1051
# repositories that support chk_bytes
1052
if not rich_root and name_utf8 == '':
1055
file_id_revisions[file_id].add(revision_id)
1057
file_id_revisions[file_id] = {revision_id}
1058
return file_id_revisions
1060
def find_text_key_references(self):
1061
"""Find the text key references within the repository.
1063
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1064
to whether they were referred to by the inventory of the
1065
revision_id that they contain. The inventory texts from all present
1066
revision ids are assessed to generate this report.
1068
# XXX: Slow version but correct: rewrite as a series of delta
1069
# examinations/direct tree traversal. Note that that will require care
1070
# as a common node is reachable both from the inventory that added it,
1071
# and others afterwards.
1072
revision_keys = self.revisions.keys()
1074
rich_roots = self.supports_rich_root()
1075
with ui.ui_factory.nested_progress_bar() as pb:
1076
all_revs = self.all_revision_ids()
1077
total = len(all_revs)
1078
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1079
pb.update("Finding text references", pos, total)
1080
for _, entry in inv.iter_entries():
1081
if not rich_roots and entry.file_id == inv.root_id:
1083
key = (entry.file_id, entry.revision)
1084
result.setdefault(key, False)
1085
if entry.revision == inv.revision_id:
1089
def reconcile_canonicalize_chks(self):
1090
"""Reconcile this repository to make sure all CHKs are in canonical
1093
from .reconcile import PackReconciler
1094
with self.lock_write():
1095
reconciler = PackReconciler(
1096
self, thorough=True, canonicalize_chks=True)
1097
return reconciler.reconcile()
1099
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1100
packer = GCCHKReconcilePacker(collection, packs, extension)
1101
return packer.pack(pb)
1103
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1104
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1105
return packer.pack(pb)
1107
def _get_source(self, to_format):
1108
"""Return a source for streaming from this repository."""
1109
if self._format._serializer == to_format._serializer:
1110
# We must be exactly the same format, otherwise stuff like the chk
1111
# page layout might be different.
1112
# Actually, this test is just slightly looser than exact so that
1113
# CHK2 <-> 2a transfers will work.
1114
return GroupCHKStreamSource(self, to_format)
1115
return super(CHKInventoryRepository, self)._get_source(to_format)
1117
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1118
"""Find revisions with different parent lists in the revision object
1119
and in the index graph.
1121
:param revisions_iterator: None, or an iterator of (revid,
1122
Revision-or-None). This iterator controls the revisions checked.
1123
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1124
parents-in-revision).
1126
if not self.is_locked():
1127
raise AssertionError()
1129
if revisions_iterator is None:
1130
revisions_iterator = self.iter_revisions(self.all_revision_ids())
1131
for revid, revision in revisions_iterator:
1132
if revision is None:
1134
parent_map = vf.get_parent_map([(revid,)])
1135
parents_according_to_index = tuple(parent[-1] for parent in
1136
parent_map[(revid,)])
1137
parents_according_to_revision = tuple(revision.parent_ids)
1138
if parents_according_to_index != parents_according_to_revision:
1139
yield (revid, parents_according_to_index,
1140
parents_according_to_revision)
1142
def _check_for_inconsistent_revision_parents(self):
1143
inconsistencies = list(self._find_inconsistent_revision_parents())
1145
raise errors.BzrCheckError(
1146
"Revision index has inconsistent parents.")
1149
class GroupCHKStreamSource(StreamSource):
1150
"""Used when both the source and target repo are GroupCHK repos."""
1152
def __init__(self, from_repository, to_format):
1153
"""Create a StreamSource streaming from from_repository."""
1154
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1155
self._revision_keys = None
1156
self._text_keys = None
1157
self._text_fetch_order = 'groupcompress'
1158
self._chk_id_roots = None
1159
self._chk_p_id_roots = None
1161
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1162
"""Get a stream of inventory texts.
1164
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1165
should be populated.
1167
self._chk_id_roots = []
1168
self._chk_p_id_roots = []
1170
def _filtered_inv_stream():
1171
id_roots_set = set()
1172
p_id_roots_set = set()
1173
source_vf = self.from_repository.inventories
1174
stream = source_vf.get_record_stream(inventory_keys,
1175
'groupcompress', True)
1176
for record in stream:
1177
if record.storage_kind == 'absent':
1181
raise errors.NoSuchRevision(self, record.key)
1182
lines = record.get_bytes_as('lines')
1183
chk_inv = inventory.CHKInventory.deserialise(None, lines,
1185
key = chk_inv.id_to_entry.key()
1186
if key not in id_roots_set:
1187
self._chk_id_roots.append(key)
1188
id_roots_set.add(key)
1189
p_id_map = chk_inv.parent_id_basename_to_file_id
1190
if p_id_map is None:
1191
raise AssertionError('Parent id -> file_id map not set')
1192
key = p_id_map.key()
1193
if key not in p_id_roots_set:
1194
p_id_roots_set.add(key)
1195
self._chk_p_id_roots.append(key)
1197
# We have finished processing all of the inventory records, we
1198
# don't need these sets anymore
1199
id_roots_set.clear()
1200
p_id_roots_set.clear()
1201
return ('inventories', _filtered_inv_stream())
1203
def _get_filtered_chk_streams(self, excluded_revision_keys):
1204
self._text_keys = set()
1205
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1206
if not excluded_revision_keys:
1207
uninteresting_root_keys = set()
1208
uninteresting_pid_root_keys = set()
1210
# filter out any excluded revisions whose inventories are not
1212
# TODO: Update Repository.iter_inventories() to add
1213
# ignore_missing=True
1214
present_keys = self.from_repository._find_present_inventory_keys(
1215
excluded_revision_keys)
1216
present_ids = [k[-1] for k in present_keys]
1217
uninteresting_root_keys = set()
1218
uninteresting_pid_root_keys = set()
1219
for inv in self.from_repository.iter_inventories(present_ids):
1220
uninteresting_root_keys.add(inv.id_to_entry.key())
1221
uninteresting_pid_root_keys.add(
1222
inv.parent_id_basename_to_file_id.key())
1223
chk_bytes = self.from_repository.chk_bytes
1225
def _filter_id_to_entry():
1226
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1227
self._chk_id_roots, uninteresting_root_keys)
1228
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1229
chk_map._bytes_to_text_key):
1230
if record is not None:
1233
self._chk_id_roots = None
1234
yield 'chk_bytes', _filter_id_to_entry()
1236
def _get_parent_id_basename_to_file_id_pages():
1237
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1238
self._chk_p_id_roots, uninteresting_pid_root_keys):
1239
if record is not None:
1242
self._chk_p_id_roots = None
1243
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1245
def _get_text_stream(self):
1246
# Note: We know we don't have to handle adding root keys, because both
1247
# the source and target are the identical network name.
1248
text_stream = self.from_repository.texts.get_record_stream(
1249
self._text_keys, self._text_fetch_order, False)
1250
return ('texts', text_stream)
1252
def get_stream(self, search):
1253
def wrap_and_count(pb, rc, stream):
1254
"""Yield records from stream while showing progress."""
1256
for record in stream:
1257
if count == rc.STEP:
1259
pb.update('Estimate', rc.current, rc.max)
1264
revision_ids = search.get_keys()
1265
with ui.ui_factory.nested_progress_bar() as pb:
1266
rc = self._record_counter
1267
self._record_counter.setup(len(revision_ids))
1268
for stream_info in self._fetch_revision_texts(revision_ids):
1269
yield (stream_info[0],
1270
wrap_and_count(pb, rc, stream_info[1]))
1271
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1272
# TODO: The keys to exclude might be part of the search recipe
1273
# For now, exclude all parents that are at the edge of ancestry, for
1274
# which we have inventories
1275
from_repo = self.from_repository
1276
parent_keys = from_repo._find_parent_keys_of_revisions(
1277
self._revision_keys)
1278
self.from_repository.revisions.clear_cache()
1279
self.from_repository.signatures.clear_cache()
1280
# Clear the repo's get_parent_map cache too.
1281
self.from_repository._unstacked_provider.disable_cache()
1282
self.from_repository._unstacked_provider.enable_cache()
1283
s = self._get_inventory_stream(self._revision_keys)
1284
yield (s[0], wrap_and_count(pb, rc, s[1]))
1285
self.from_repository.inventories.clear_cache()
1286
for stream_info in self._get_filtered_chk_streams(parent_keys):
1287
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1288
self.from_repository.chk_bytes.clear_cache()
1289
s = self._get_text_stream()
1290
yield (s[0], wrap_and_count(pb, rc, s[1]))
1291
self.from_repository.texts.clear_cache()
1292
pb.update('Done', rc.max, rc.max)
1294
def get_stream_for_missing_keys(self, missing_keys):
1295
# missing keys can only occur when we are byte copying and not
1296
# translating (because translation means we don't send
1297
# unreconstructable deltas ever).
1298
missing_inventory_keys = set()
1299
for key in missing_keys:
1300
if key[0] != 'inventories':
1301
raise AssertionError('The only missing keys we should'
1302
' be filling in are inventory keys, not %s'
1304
missing_inventory_keys.add(key[1:])
1305
if self._chk_id_roots or self._chk_p_id_roots:
1306
raise AssertionError('Cannot call get_stream_for_missing_keys'
1307
' until all of get_stream() has been consumed.')
1308
# Yield the inventory stream, so we can find the chk stream
1309
# Some of the missing_keys will be missing because they are ghosts.
1310
# As such, we can ignore them. The Sink is required to verify there are
1311
# no unavailable texts when the ghost inventories are not filled in.
1312
yield self._get_inventory_stream(missing_inventory_keys,
1314
# We use the empty set for excluded_revision_keys, to make it clear
1315
# that we want to transmit all referenced chk pages.
1316
for stream_info in self._get_filtered_chk_streams(set()):
1320
class _InterestingKeyInfo(object):
1322
self.interesting_root_keys = set()
1323
self.interesting_pid_root_keys = set()
1324
self.uninteresting_root_keys = set()
1325
self.uninteresting_pid_root_keys = set()
1327
def all_interesting(self):
1328
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1330
def all_uninteresting(self):
1331
return self.uninteresting_root_keys.union(
1332
self.uninteresting_pid_root_keys)
1335
return self.all_interesting().union(self.all_uninteresting())
1338
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1339
result = _InterestingKeyInfo()
1340
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1341
root_key = inv.id_to_entry.key()
1342
pid_root_key = inv.parent_id_basename_to_file_id.key()
1343
if inv.revision_id in parent_only_inv_ids:
1344
result.uninteresting_root_keys.add(root_key)
1345
result.uninteresting_pid_root_keys.add(pid_root_key)
1347
result.interesting_root_keys.add(root_key)
1348
result.interesting_pid_root_keys.add(pid_root_key)
1352
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1353
"""Iterate the result of iter_interesting_nodes, yielding the records
1354
and adding to text_keys.
1356
text_keys_update = text_keys.update
1357
for record, items in interesting_nodes_iterable:
1358
text_keys_update([bytes_to_text_key(b) for n, b in items])
1362
class RepositoryFormat2a(RepositoryFormatPack):
1363
"""A CHK repository that uses the bencode revision serializer."""
1365
repository_class = CHKInventoryRepository
1366
supports_external_lookups = True
1367
supports_chks = True
1368
_commit_builder_class = PackCommitBuilder
1369
rich_root_data = True
1370
_serializer = chk_serializer.chk_bencode_serializer
1371
_commit_inv_deltas = True
1372
# What index classes to use
1373
index_builder_class = BTreeBuilder
1374
index_class = BTreeGraphIndex
1375
# Note: We cannot unpack a delta that references a text we haven't
1376
# seen yet. There are 2 options, work in fulltexts, or require
1377
# topological sorting. Using fulltexts is more optimal for local
1378
# operations, because the source can be smart about extracting
1379
# multiple in-a-row (and sharing strings). Topological is better
1380
# for remote, because we access less data.
1381
_fetch_order = 'unordered'
1382
# essentially ignored by the groupcompress code.
1383
_fetch_uses_deltas = False
1385
pack_compresses = True
1386
supports_tree_reference = True
1388
def _get_matching_bzrdir(self):
1389
return controldir.format_registry.make_controldir('2a')
1391
def _ignore_setting_bzrdir(self, format):
1394
_matchingcontroldir = property(
1395
_get_matching_bzrdir, _ignore_setting_bzrdir)
1398
def get_format_string(cls):
1399
return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
1401
def get_format_description(self):
1402
"""See RepositoryFormat.get_format_description()."""
1403
return ("Repository format 2a - rich roots, group compression"
1404
" and chk inventories")
1407
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1408
"""A 2a repository format that supports nested trees.
1412
def _get_matching_bzrdir(self):
1413
return controldir.format_registry.make_controldir('development-subtree')
1415
def _ignore_setting_bzrdir(self, format):
1418
_matchingcontroldir = property(
1419
_get_matching_bzrdir, _ignore_setting_bzrdir)
1422
def get_format_string(cls):
1423
return b'Bazaar development format 8\n'
1425
def get_format_description(self):
1426
"""See RepositoryFormat.get_format_description()."""
1427
return ("Development repository format 8 - nested trees, "
1428
"group compression and chk inventories")
1431
supports_tree_reference = True