1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
19
from __future__ import absolute_import
28
revision as _mod_revision,
40
from ..bzr.btree_index import (
44
from ..decorators import needs_write_lock
45
from ..bzr.groupcompress import (
47
GroupCompressVersionedFiles,
49
from .pack_repo import (
54
PackRootCommitBuilder,
55
RepositoryPackCollection,
60
from ..bzr.vf_repository import (
63
from ..sixish import (
67
from ..static_tuple import StaticTuple
70
class GCPack(NewPack):
72
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
73
"""Create a NewPack instance.
75
:param pack_collection: A PackCollection into which this is being
77
:param upload_suffix: An optional suffix to be given to any temporary
78
files created during the pack creation. e.g '.autopack'
79
:param file_mode: An optional file mode to create the new files with.
81
# replaced from NewPack to:
82
# - change inventory reference list length to 1
83
# - change texts reference lists to 1
84
# TODO: patch this to be parameterised
86
# The relative locations of the packs are constrained, but all are
87
# passed in because the caller has them, so as to avoid object churn.
88
index_builder_class = pack_collection._index_builder_class
90
if pack_collection.chk_index is not None:
91
chk_index = index_builder_class(reference_lists=0)
95
# Revisions: parents list, no text compression.
96
index_builder_class(reference_lists=1),
97
# Inventory: We want to map compression only, but currently the
98
# knit code hasn't been updated enough to understand that, so we
99
# have a regular 2-list index giving parents and compression
101
index_builder_class(reference_lists=1),
102
# Texts: per file graph, for all fileids - so one reference list
103
# and two elements in the key tuple.
104
index_builder_class(reference_lists=1, key_elements=2),
105
# Signatures: Just blobs to store, no compression, no parents
107
index_builder_class(reference_lists=0),
108
# CHK based storage - just blobs, no compression or parents.
111
self._pack_collection = pack_collection
112
# When we make readonly indices, we need this.
113
self.index_class = pack_collection._index_class
114
# where should the new pack be opened
115
self.upload_transport = pack_collection._upload_transport
116
# where are indices written out to
117
self.index_transport = pack_collection._index_transport
118
# where is the pack renamed to when it is finished?
119
self.pack_transport = pack_collection._pack_transport
120
# What file mode to upload the pack and indices with.
121
self._file_mode = file_mode
122
# tracks the content written to the .pack file.
123
self._hash = osutils.md5()
124
# a four-tuple with the length in bytes of the indices, once the pack
125
# is finalised. (rev, inv, text, sigs)
126
self.index_sizes = None
127
# How much data to cache when writing packs. Note that this is not
128
# synchronised with reads, because it's not in the transport layer, so
129
# is not safe unless the client knows it won't be reading from the pack
131
self._cache_limit = 0
132
# the temporary pack file name.
133
self.random_name = osutils.rand_chars(20) + upload_suffix
134
# when was this pack started ?
135
self.start_time = time.time()
136
# open an output stream for the data added to the pack.
137
self.write_stream = self.upload_transport.open_write_stream(
138
self.random_name, mode=self._file_mode)
139
if 'pack' in debug.debug_flags:
140
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
141
time.ctime(), self.upload_transport.base, self.random_name,
142
time.time() - self.start_time)
143
# A list of byte sequences to be written to the new pack, and the
144
# aggregate size of them. Stored as a list rather than separate
145
# variables so that the _write_data closure below can update them.
146
self._buffer = [[], 0]
147
# create a callable for adding data
149
# robertc says- this is a closure rather than a method on the object
150
# so that the variables are locals, and faster than accessing object
152
def _write_data(bytes, flush=False, _buffer=self._buffer,
153
_write=self.write_stream.write, _update=self._hash.update):
154
_buffer[0].append(bytes)
155
_buffer[1] += len(bytes)
157
if _buffer[1] > self._cache_limit or flush:
158
bytes = ''.join(_buffer[0])
162
# expose this on self, for the occasion when clients want to add data.
163
self._write_data = _write_data
164
# a pack writer object to serialise pack records.
165
self._writer = pack.ContainerWriter(self._write_data)
167
# what state is the pack in? (open, finished, aborted)
169
# no name until we finish writing the content
172
def _check_references(self):
173
"""Make sure our external references are present.
175
Packs are allowed to have deltas whose base is not in the pack, but it
176
must be present somewhere in this collection. It is not allowed to
177
have deltas based on a fallback repository.
178
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
180
# Groupcompress packs don't have any external references, arguably CHK
181
# pages have external references, but we cannot 'cheaply' determine
182
# them without actually walking all of the chk pages.
185
class ResumedGCPack(ResumedPack):
187
def _check_references(self):
188
"""Make sure our external compression parents are present."""
189
# See GCPack._check_references for why this is empty
191
def _get_external_refs(self, index):
192
# GC repositories don't have compression parents external to a given
197
class GCCHKPacker(Packer):
198
"""This class understand what it takes to collect a GCCHK repo."""
200
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
202
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
203
revision_ids=revision_ids,
204
reload_func=reload_func)
205
self._pack_collection = pack_collection
206
# ATM, We only support this for GCCHK repositories
207
if pack_collection.chk_index is None:
208
raise AssertionError('pack_collection.chk_index should not be None')
209
self._gather_text_refs = False
210
self._chk_id_roots = []
211
self._chk_p_id_roots = []
212
self._text_refs = None
213
# set by .pack() if self.revision_ids is not None
214
self.revision_keys = None
216
def _get_progress_stream(self, source_vf, keys, message, pb):
218
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
219
for idx, record in enumerate(substream):
221
pb.update(message, idx + 1, len(keys))
225
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
226
"""Filter the texts of inventories, to find the chk pages."""
227
total_keys = len(keys)
228
def _filtered_inv_stream():
230
p_id_roots_set = set()
231
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
232
for idx, record in enumerate(stream):
233
# Inventories should always be with revisions; assume success.
234
bytes = record.get_bytes_as('fulltext')
235
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
238
pb.update('inv', idx, total_keys)
239
key = chk_inv.id_to_entry.key()
240
if key not in id_roots_set:
241
self._chk_id_roots.append(key)
242
id_roots_set.add(key)
243
p_id_map = chk_inv.parent_id_basename_to_file_id
245
raise AssertionError('Parent id -> file_id map not set')
247
if key not in p_id_roots_set:
248
p_id_roots_set.add(key)
249
self._chk_p_id_roots.append(key)
251
# We have finished processing all of the inventory records, we
252
# don't need these sets anymore
254
p_id_roots_set.clear()
255
return _filtered_inv_stream()
257
def _get_chk_streams(self, source_vf, keys, pb=None):
258
# We want to stream the keys from 'id_roots', and things they
259
# reference, and then stream things from p_id_roots and things they
260
# reference, and then any remaining keys that we didn't get to.
262
# We also group referenced texts together, so if one root references a
263
# text with prefix 'a', and another root references a node with prefix
264
# 'a', we want to yield those nodes before we yield the nodes for 'b'
265
# This keeps 'similar' nodes together.
267
# Note: We probably actually want multiple streams here, to help the
268
# client understand that the different levels won't compress well
269
# against each other.
270
# Test the difference between using one Group per level, and
271
# using 1 Group per prefix. (so '' (root) would get a group, then
272
# all the references to search-key 'a' would get a group, etc.)
273
total_keys = len(keys)
274
remaining_keys = set(keys)
276
if self._gather_text_refs:
277
self._text_refs = set()
278
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
281
keys_by_search_prefix = {}
282
remaining_keys.difference_update(cur_keys)
284
def handle_internal_node(node):
285
for prefix, value in viewitems(node._items):
286
# We don't want to request the same key twice, and we
287
# want to order it by the first time it is seen.
288
# Even further, we don't want to request a key which is
289
# not in this group of pack files (it should be in the
290
# repo, but it doesn't have to be in the group being
292
# TODO: consider how to treat externally referenced chk
293
# pages as 'external_references' so that we
294
# always fill them in for stacked branches
295
if value not in next_keys and value in remaining_keys:
296
keys_by_search_prefix.setdefault(prefix,
299
def handle_leaf_node(node):
300
# Store is None, because we know we have a LeafNode, and we
301
# just want its entries
302
for file_id, bytes in node.iteritems(None):
303
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
305
stream = source_vf.get_record_stream(cur_keys,
306
'as-requested', True)
307
for record in stream:
308
if record.storage_kind == 'absent':
309
# An absent CHK record: we assume that the missing
310
# record is in a different pack - e.g. a page not
311
# altered by the commit we're packing.
313
bytes = record.get_bytes_as('fulltext')
314
# We don't care about search_key_func for this code,
315
# because we only care about external references.
316
node = chk_map._deserialise(bytes, record.key,
317
search_key_func=None)
318
common_base = node._search_prefix
319
if isinstance(node, chk_map.InternalNode):
320
handle_internal_node(node)
321
elif parse_leaf_nodes:
322
handle_leaf_node(node)
325
pb.update('chk node', counter[0], total_keys)
328
# Double check that we won't be emitting any keys twice
329
# If we get rid of the pre-calculation of all keys, we could
330
# turn this around and do
331
# next_keys.difference_update(seen_keys)
332
# However, we also may have references to chk pages in another
333
# pack file during autopack. We filter earlier, so we should no
334
# longer need to do this
335
# next_keys = next_keys.intersection(remaining_keys)
337
for prefix in sorted(keys_by_search_prefix):
338
cur_keys.extend(keys_by_search_prefix.pop(prefix))
339
for stream in _get_referenced_stream(self._chk_id_roots,
340
self._gather_text_refs):
342
del self._chk_id_roots
343
# while it isn't really possible for chk_id_roots to not be in the
344
# local group of packs, it is possible that the tree shape has not
345
# changed recently, so we need to filter _chk_p_id_roots by the
347
chk_p_id_roots = [key for key in self._chk_p_id_roots
348
if key in remaining_keys]
349
del self._chk_p_id_roots
350
for stream in _get_referenced_stream(chk_p_id_roots, False):
353
trace.mutter('There were %d keys in the chk index, %d of which'
354
' were not referenced', total_keys,
356
if self.revision_ids is None:
357
stream = source_vf.get_record_stream(remaining_keys,
361
def _build_vf(self, index_name, parents, delta, for_write=False):
362
"""Build a VersionedFiles instance on top of this group of packs."""
363
index_name = index_name + '_index'
365
access = _DirectPackAccess(index_to_pack,
366
reload_func=self._reload_func)
369
if self.new_pack is None:
370
raise AssertionError('No new pack has been set')
371
index = getattr(self.new_pack, index_name)
372
index_to_pack[index] = self.new_pack.access_tuple()
373
index.set_optimize(for_size=True)
374
access.set_writer(self.new_pack._writer, index,
375
self.new_pack.access_tuple())
376
add_callback = index.add_nodes
379
for pack in self.packs:
380
sub_index = getattr(pack, index_name)
381
index_to_pack[sub_index] = pack.access_tuple()
382
indices.append(sub_index)
383
index = _mod_index.CombinedGraphIndex(indices)
385
vf = GroupCompressVersionedFiles(
387
add_callback=add_callback,
389
is_locked=self._pack_collection.repo.is_locked),
394
def _build_vfs(self, index_name, parents, delta):
395
"""Build the source and target VersionedFiles."""
396
source_vf = self._build_vf(index_name, parents,
397
delta, for_write=False)
398
target_vf = self._build_vf(index_name, parents,
399
delta, for_write=True)
400
return source_vf, target_vf
402
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
404
trace.mutter('repacking %d %s', len(keys), message)
405
self.pb.update('repacking %s' % (message,), pb_offset)
406
child_pb = ui.ui_factory.nested_progress_bar()
408
stream = vf_to_stream(source_vf, keys, message, child_pb)
409
for _ in target_vf._insert_record_stream(stream,
416
def _copy_revision_texts(self):
417
source_vf, target_vf = self._build_vfs('revision', True, False)
418
if not self.revision_keys:
419
# We are doing a full fetch, aka 'pack'
420
self.revision_keys = source_vf.keys()
421
self._copy_stream(source_vf, target_vf, self.revision_keys,
422
'revisions', self._get_progress_stream, 1)
424
def _copy_inventory_texts(self):
425
source_vf, target_vf = self._build_vfs('inventory', True, True)
426
# It is not sufficient to just use self.revision_keys, as stacked
427
# repositories can have more inventories than they have revisions.
428
# One alternative would be to do something with
429
# get_parent_map(self.revision_keys), but that shouldn't be any faster
431
inventory_keys = source_vf.keys()
432
missing_inventories = set(self.revision_keys).difference(inventory_keys)
433
if missing_inventories:
434
# Go back to the original repo, to see if these are really missing
435
# https://bugs.launchpad.net/bzr/+bug/437003
436
# If we are packing a subset of the repo, it is fine to just have
437
# the data in another Pack file, which is not included in this pack
439
inv_index = self._pack_collection.repo.inventories._index
440
pmap = inv_index.get_parent_map(missing_inventories)
441
really_missing = missing_inventories.difference(pmap)
443
missing_inventories = sorted(really_missing)
444
raise ValueError('We are missing inventories for revisions: %s'
445
% (missing_inventories,))
446
self._copy_stream(source_vf, target_vf, inventory_keys,
447
'inventories', self._get_filtered_inv_stream, 2)
449
def _get_chk_vfs_for_copy(self):
450
return self._build_vfs('chk', False, False)
452
def _copy_chk_texts(self):
453
source_vf, target_vf = self._get_chk_vfs_for_copy()
454
# TODO: This is technically spurious... if it is a performance issue,
456
total_keys = source_vf.keys()
457
trace.mutter('repacking chk: %d id_to_entry roots,'
458
' %d p_id_map roots, %d total keys',
459
len(self._chk_id_roots), len(self._chk_p_id_roots),
461
self.pb.update('repacking chk', 3)
462
child_pb = ui.ui_factory.nested_progress_bar()
464
for stream in self._get_chk_streams(source_vf, total_keys,
466
for _ in target_vf._insert_record_stream(stream,
473
def _copy_text_texts(self):
474
source_vf, target_vf = self._build_vfs('text', True, True)
475
# XXX: We don't walk the chk map to determine referenced (file_id,
476
# revision_id) keys. We don't do it yet because you really need
477
# to filter out the ones that are present in the parents of the
478
# rev just before the ones you are copying, otherwise the filter
479
# is grabbing too many keys...
480
text_keys = source_vf.keys()
481
self._copy_stream(source_vf, target_vf, text_keys,
482
'texts', self._get_progress_stream, 4)
484
def _copy_signature_texts(self):
485
source_vf, target_vf = self._build_vfs('signature', False, False)
486
signature_keys = source_vf.keys()
487
signature_keys.intersection(self.revision_keys)
488
self._copy_stream(source_vf, target_vf, signature_keys,
489
'signatures', self._get_progress_stream, 5)
491
def _create_pack_from_packs(self):
492
self.pb.update('repacking', 0, 7)
493
self.new_pack = self.open_pack()
494
# Is this necessary for GC ?
495
self.new_pack.set_write_cache_size(1024*1024)
496
self._copy_revision_texts()
497
self._copy_inventory_texts()
498
self._copy_chk_texts()
499
self._copy_text_texts()
500
self._copy_signature_texts()
501
self.new_pack._check_references()
502
if not self._use_pack(self.new_pack):
503
self.new_pack.abort()
505
self.new_pack.finish_content()
506
if len(self.packs) == 1:
507
old_pack = self.packs[0]
508
if old_pack.name == self.new_pack._hash.hexdigest():
509
# The single old pack was already optimally packed.
510
trace.mutter('single pack %s was already optimally packed',
512
self.new_pack.abort()
514
self.pb.update('finishing repack', 6, 7)
515
self.new_pack.finish()
516
self._pack_collection.allocate(self.new_pack)
520
class GCCHKReconcilePacker(GCCHKPacker):
521
"""A packer which regenerates indices etc as it copies.
523
This is used by ``brz reconcile`` to cause parent text pointers to be
527
def __init__(self, *args, **kwargs):
528
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
529
self._data_changed = False
530
self._gather_text_refs = True
532
def _copy_inventory_texts(self):
533
source_vf, target_vf = self._build_vfs('inventory', True, True)
534
self._copy_stream(source_vf, target_vf, self.revision_keys,
535
'inventories', self._get_filtered_inv_stream, 2)
536
if source_vf.keys() != self.revision_keys:
537
self._data_changed = True
539
def _copy_text_texts(self):
540
"""generate what texts we should have and then copy."""
541
source_vf, target_vf = self._build_vfs('text', True, True)
542
trace.mutter('repacking %d texts', len(self._text_refs))
543
self.pb.update("repacking texts", 4)
544
# we have three major tasks here:
545
# 1) generate the ideal index
546
repo = self._pack_collection.repo
547
# We want the one we just wrote, so base it on self.new_pack
548
revision_vf = self._build_vf('revision', True, False, for_write=True)
549
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
550
# Strip keys back into revision_ids.
551
ancestors = dict((k[0], tuple([p[0] for p in parents]))
552
for k, parents in viewitems(ancestor_keys))
554
# TODO: _generate_text_key_index should be much cheaper to generate from
555
# a chk repository, rather than the current implementation
556
ideal_index = repo._generate_text_key_index(None, ancestors)
557
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
558
# 2) generate a keys list that contains all the entries that can
559
# be used as-is, with corrected parents.
561
new_parent_keys = {} # (key, parent_keys)
563
NULL_REVISION = _mod_revision.NULL_REVISION
564
for key in self._text_refs:
570
ideal_parents = tuple(ideal_index[key])
572
discarded_keys.append(key)
573
self._data_changed = True
575
if ideal_parents == (NULL_REVISION,):
577
source_parents = file_id_parent_map[key]
578
if ideal_parents == source_parents:
582
# We need to change the parent graph, but we don't need to
583
# re-insert the text (since we don't pun the compression
584
# parent with the parents list)
585
self._data_changed = True
586
new_parent_keys[key] = ideal_parents
587
# we're finished with some data.
589
del file_id_parent_map
590
# 3) bulk copy the data, updating records than need it
591
def _update_parents_for_texts():
592
stream = source_vf.get_record_stream(self._text_refs,
593
'groupcompress', False)
594
for record in stream:
595
if record.key in new_parent_keys:
596
record.parents = new_parent_keys[record.key]
598
target_vf.insert_record_stream(_update_parents_for_texts())
600
def _use_pack(self, new_pack):
601
"""Override _use_pack to check for reconcile having changed content."""
602
return new_pack.data_inserted() and self._data_changed
605
class GCCHKCanonicalizingPacker(GCCHKPacker):
606
"""A packer that ensures inventories have canonical-form CHK maps.
608
Ideally this would be part of reconcile, but it's very slow and rarely
609
needed. (It repairs repositories affected by
610
https://bugs.launchpad.net/bzr/+bug/522637).
613
def __init__(self, *args, **kwargs):
614
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
615
self._data_changed = False
617
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
618
"""Create and exhaust a stream, but don't insert it.
620
This is useful to get the side-effects of generating a stream.
622
self.pb.update('scanning %s' % (message,), pb_offset)
623
child_pb = ui.ui_factory.nested_progress_bar()
625
list(vf_to_stream(source_vf, keys, message, child_pb))
629
def _copy_inventory_texts(self):
630
source_vf, target_vf = self._build_vfs('inventory', True, True)
631
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
632
inventory_keys = source_vf.keys()
633
# First, copy the existing CHKs on the assumption that most of them
634
# will be correct. This will save us from having to reinsert (and
635
# recompress) these records later at the cost of perhaps preserving a
637
# (Iterate but don't insert _get_filtered_inv_stream to populate the
638
# variables needed by GCCHKPacker._copy_chk_texts.)
639
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
640
self._get_filtered_inv_stream, 2)
641
GCCHKPacker._copy_chk_texts(self)
642
# Now copy and fix the inventories, and any regenerated CHKs.
643
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
644
return self._get_filtered_canonicalizing_inv_stream(
645
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
646
self._copy_stream(source_vf, target_vf, inventory_keys,
647
'inventories', chk_canonicalizing_inv_stream, 4)
649
def _copy_chk_texts(self):
650
# No-op; in this class this happens during _copy_inventory_texts.
653
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
654
pb=None, source_chk_vf=None, target_chk_vf=None):
655
"""Filter the texts of inventories, regenerating CHKs to make sure they
658
total_keys = len(keys)
659
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
660
def _filtered_inv_stream():
661
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
662
search_key_name = None
663
for idx, record in enumerate(stream):
664
# Inventories should always be with revisions; assume success.
665
bytes = record.get_bytes_as('fulltext')
666
chk_inv = inventory.CHKInventory.deserialise(
667
source_chk_vf, bytes, record.key)
669
pb.update('inv', idx, total_keys)
670
chk_inv.id_to_entry._ensure_root()
671
if search_key_name is None:
672
# Find the name corresponding to the search_key_func
673
search_key_reg = chk_map.search_key_registry
674
for search_key_name, func in viewitems(search_key_reg):
675
if func == chk_inv.id_to_entry._search_key_func:
677
canonical_inv = inventory.CHKInventory.from_inventory(
678
target_chk_vf, chk_inv,
679
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
680
search_key_name=search_key_name)
681
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
683
'Non-canonical CHK map for id_to_entry of inv: %s '
684
'(root is %s, should be %s)' % (chk_inv.revision_id,
685
chk_inv.id_to_entry.key()[0],
686
canonical_inv.id_to_entry.key()[0]))
687
self._data_changed = True
688
p_id_map = chk_inv.parent_id_basename_to_file_id
689
p_id_map._ensure_root()
690
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
691
if p_id_map.key() != canon_p_id_map.key():
693
'Non-canonical CHK map for parent_id_to_basename of '
694
'inv: %s (root is %s, should be %s)'
695
% (chk_inv.revision_id, p_id_map.key()[0],
696
canon_p_id_map.key()[0]))
697
self._data_changed = True
698
yield versionedfile.ChunkedContentFactory(record.key,
699
record.parents, record.sha1,
700
canonical_inv.to_lines())
701
# We have finished processing all of the inventory records, we
702
# don't need these sets anymore
703
return _filtered_inv_stream()
705
def _use_pack(self, new_pack):
706
"""Override _use_pack to check for reconcile having changed content."""
707
return new_pack.data_inserted() and self._data_changed
710
class GCRepositoryPackCollection(RepositoryPackCollection):
712
pack_factory = GCPack
713
resumed_pack_factory = ResumedGCPack
714
normal_packer_class = GCCHKPacker
715
optimising_packer_class = GCCHKPacker
717
def _check_new_inventories(self):
718
"""Detect missing inventories or chk root entries for the new revisions
721
:returns: list of strs, summarising any problems found. If the list is
722
empty no problems were found.
724
# Ensure that all revisions added in this write group have:
725
# - corresponding inventories,
726
# - chk root entries for those inventories,
727
# - and any present parent inventories have their chk root
729
# And all this should be independent of any fallback repository.
731
key_deps = self.repo.revisions._index._key_dependencies
732
new_revisions_keys = key_deps.get_new_keys()
733
no_fallback_inv_index = self.repo.inventories._index
734
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
735
no_fallback_texts_index = self.repo.texts._index
736
inv_parent_map = no_fallback_inv_index.get_parent_map(
738
# Are any inventories for corresponding to the new revisions missing?
739
corresponding_invs = set(inv_parent_map)
740
missing_corresponding = set(new_revisions_keys)
741
missing_corresponding.difference_update(corresponding_invs)
742
if missing_corresponding:
743
problems.append("inventories missing for revisions %s" %
744
(sorted(missing_corresponding),))
746
# Are any chk root entries missing for any inventories? This includes
747
# any present parent inventories, which may be used when calculating
748
# deltas for streaming.
749
all_inv_keys = set(corresponding_invs)
750
for parent_inv_keys in viewvalues(inv_parent_map):
751
all_inv_keys.update(parent_inv_keys)
752
# Filter out ghost parents.
753
all_inv_keys.intersection_update(
754
no_fallback_inv_index.get_parent_map(all_inv_keys))
755
parent_invs_only_keys = all_inv_keys.symmetric_difference(
758
inv_ids = [key[-1] for key in all_inv_keys]
759
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
760
root_key_info = _build_interesting_key_sets(
761
self.repo, inv_ids, parent_invs_only_ids)
762
expected_chk_roots = root_key_info.all_keys()
763
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
765
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
766
if missing_chk_roots:
768
"missing referenced chk root keys: %s."
769
"Run 'brz reconcile --canonicalize-chks' on the affected "
771
% (sorted(missing_chk_roots),))
772
# Don't bother checking any further.
774
# Find all interesting chk_bytes records, and make sure they are
775
# present, as well as the text keys they reference.
776
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
777
chk_bytes_no_fallbacks._search_key_func = \
778
self.repo.chk_bytes._search_key_func
779
chk_diff = chk_map.iter_interesting_nodes(
780
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
781
root_key_info.uninteresting_root_keys)
784
for record in _filter_text_keys(chk_diff, text_keys,
785
chk_map._bytes_to_text_key):
787
except errors.NoSuchRevision as e:
788
# XXX: It would be nice if we could give a more precise error here.
789
problems.append("missing chk node(s) for id_to_entry maps")
790
chk_diff = chk_map.iter_interesting_nodes(
791
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
792
root_key_info.uninteresting_pid_root_keys)
794
for interesting_rec, interesting_map in chk_diff:
796
except errors.NoSuchRevision as e:
798
"missing chk node(s) for parent_id_basename_to_file_id maps")
799
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
800
missing_text_keys = text_keys.difference(present_text_keys)
801
if missing_text_keys:
802
problems.append("missing text keys: %r"
803
% (sorted(missing_text_keys),))
807
class CHKInventoryRepository(PackRepository):
808
"""subclass of PackRepository that uses CHK based inventories."""
810
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
812
"""Overridden to change pack collection class."""
813
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
814
control_files, _commit_builder_class, _serializer)
815
index_transport = self._transport.clone('indices')
816
self._pack_collection = GCRepositoryPackCollection(self,
817
self._transport, index_transport,
818
self._transport.clone('upload'),
819
self._transport.clone('packs'),
820
_format.index_builder_class,
822
use_chk_index=self._format.supports_chks,
824
self.inventories = GroupCompressVersionedFiles(
825
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
826
add_callback=self._pack_collection.inventory_index.add_callback,
827
parents=True, is_locked=self.is_locked,
828
inconsistency_fatal=False),
829
access=self._pack_collection.inventory_index.data_access)
830
self.revisions = GroupCompressVersionedFiles(
831
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
832
add_callback=self._pack_collection.revision_index.add_callback,
833
parents=True, is_locked=self.is_locked,
834
track_external_parent_refs=True, track_new_keys=True),
835
access=self._pack_collection.revision_index.data_access,
837
self.signatures = GroupCompressVersionedFiles(
838
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
839
add_callback=self._pack_collection.signature_index.add_callback,
840
parents=False, is_locked=self.is_locked,
841
inconsistency_fatal=False),
842
access=self._pack_collection.signature_index.data_access,
844
self.texts = GroupCompressVersionedFiles(
845
_GCGraphIndex(self._pack_collection.text_index.combined_index,
846
add_callback=self._pack_collection.text_index.add_callback,
847
parents=True, is_locked=self.is_locked,
848
inconsistency_fatal=False),
849
access=self._pack_collection.text_index.data_access)
850
# No parents, individual CHK pages don't have specific ancestry
851
self.chk_bytes = GroupCompressVersionedFiles(
852
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
853
add_callback=self._pack_collection.chk_index.add_callback,
854
parents=False, is_locked=self.is_locked,
855
inconsistency_fatal=False),
856
access=self._pack_collection.chk_index.data_access)
857
search_key_name = self._format._serializer.search_key_name
858
search_key_func = chk_map.search_key_registry.get(search_key_name)
859
self.chk_bytes._search_key_func = search_key_func
860
# True when the repository object is 'write locked' (as opposed to the
861
# physical lock only taken out around changes to the pack-names list.)
862
# Another way to represent this would be a decorator around the control
863
# files object that presents logical locks as physical ones - if this
864
# gets ugly consider that alternative design. RBC 20071011
865
self._write_lock_count = 0
866
self._transaction = None
868
self._reconcile_does_inventory_gc = True
869
self._reconcile_fixes_text_parents = True
870
self._reconcile_backsup_inventory = False
872
def _add_inventory_checked(self, revision_id, inv, parents):
873
"""Add inv to the repository after checking the inputs.
875
This function can be overridden to allow different inventory styles.
877
:seealso: add_inventory, for the contract.
880
serializer = self._format._serializer
881
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
882
maximum_size=serializer.maximum_size,
883
search_key_name=serializer.search_key_name)
884
inv_lines = result.to_lines()
885
return self._inventory_add_lines(revision_id, parents,
886
inv_lines, check_content=False)
888
def _create_inv_from_null(self, delta, revision_id):
889
"""This will mutate new_inv directly.
891
This is a simplified form of create_by_apply_delta which knows that all
892
the old values must be None, so everything is a create.
894
serializer = self._format._serializer
895
new_inv = inventory.CHKInventory(serializer.search_key_name)
896
new_inv.revision_id = revision_id
897
entry_to_bytes = new_inv._entry_to_bytes
898
id_to_entry_dict = {}
899
parent_id_basename_dict = {}
900
for old_path, new_path, file_id, entry in delta:
901
if old_path is not None:
902
raise ValueError('Invalid delta, somebody tried to delete %r'
903
' from the NULL_REVISION'
904
% ((old_path, file_id),))
906
raise ValueError('Invalid delta, delta from NULL_REVISION has'
907
' no new_path %r' % (file_id,))
909
new_inv.root_id = file_id
910
parent_id_basename_key = StaticTuple('', '').intern()
912
utf8_entry_name = entry.name.encode('utf-8')
913
parent_id_basename_key = StaticTuple(entry.parent_id,
914
utf8_entry_name).intern()
915
new_value = entry_to_bytes(entry)
917
# new_inv._path_to_fileid_cache[new_path] = file_id
918
key = StaticTuple(file_id).intern()
919
id_to_entry_dict[key] = new_value
920
parent_id_basename_dict[parent_id_basename_key] = file_id
922
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
923
parent_id_basename_dict, maximum_size=serializer.maximum_size)
926
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
927
parents, basis_inv=None, propagate_caches=False):
928
"""Add a new inventory expressed as a delta against another revision.
930
:param basis_revision_id: The inventory id the delta was created
932
:param delta: The inventory delta (see Inventory.apply_delta for
934
:param new_revision_id: The revision id that the inventory is being
936
:param parents: The revision ids of the parents that revision_id is
937
known to have and are in the repository already. These are supplied
938
for repositories that depend on the inventory graph for revision
939
graph access, as well as for those that pun ancestry with delta
941
:param basis_inv: The basis inventory if it is already known,
943
:param propagate_caches: If True, the caches for this inventory are
944
copied to and updated for the result if possible.
946
:returns: (validator, new_inv)
947
The validator(which is a sha1 digest, though what is sha'd is
948
repository format specific) of the serialized inventory, and the
951
if not self.is_in_write_group():
952
raise AssertionError("%r not in write group" % (self,))
953
_mod_revision.check_not_reserved_id(new_revision_id)
955
if basis_inv is None:
956
if basis_revision_id == _mod_revision.NULL_REVISION:
957
new_inv = self._create_inv_from_null(delta, new_revision_id)
958
if new_inv.root_id is None:
959
raise errors.RootMissing()
960
inv_lines = new_inv.to_lines()
961
return self._inventory_add_lines(new_revision_id, parents,
962
inv_lines, check_content=False), new_inv
964
basis_tree = self.revision_tree(basis_revision_id)
965
basis_tree.lock_read()
966
basis_inv = basis_tree.root_inventory
968
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
969
propagate_caches=propagate_caches)
970
inv_lines = result.to_lines()
971
return self._inventory_add_lines(new_revision_id, parents,
972
inv_lines, check_content=False), result
974
if basis_tree is not None:
977
def _deserialise_inventory(self, revision_id, bytes):
978
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
981
def _iter_inventories(self, revision_ids, ordering):
982
"""Iterate over many inventory objects."""
984
ordering = 'unordered'
985
keys = [(revision_id,) for revision_id in revision_ids]
986
stream = self.inventories.get_record_stream(keys, ordering, True)
988
for record in stream:
989
if record.storage_kind != 'absent':
990
texts[record.key] = record.get_bytes_as('fulltext')
992
texts[record.key] = None
996
yield (None, key[-1])
998
yield (inventory.CHKInventory.deserialise(
999
self.chk_bytes, bytes, key), key[-1])
1001
def _get_inventory_xml(self, revision_id):
1002
"""Get serialized inventory as a string."""
1003
# Without a native 'xml' inventory, this method doesn't make sense.
1004
# However older working trees, and older bundles want it - so we supply
1005
# it allowing _get_inventory_xml to work. Bundles currently use the
1006
# serializer directly; this also isn't ideal, but there isn't an xml
1007
# iteration interface offered at all for repositories.
1008
return self._serializer.write_inventory_to_string(
1009
self.get_inventory(revision_id))
1011
def _find_present_inventory_keys(self, revision_keys):
1012
parent_map = self.inventories.get_parent_map(revision_keys)
1013
present_inventory_keys = set(k for k in parent_map)
1014
return present_inventory_keys
1016
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1017
"""Find the file ids and versions affected by revisions.
1019
:param revisions: an iterable containing revision ids.
1020
:param _inv_weave: The inventory weave from this repository or None.
1021
If None, the inventory weave will be opened automatically.
1022
:return: a dictionary mapping altered file-ids to an iterable of
1023
revision_ids. Each altered file-ids has the exact revision_ids that
1024
altered it listed explicitly.
1026
rich_root = self.supports_rich_root()
1027
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1028
file_id_revisions = {}
1029
pb = ui.ui_factory.nested_progress_bar()
1031
revision_keys = [(r,) for r in revision_ids]
1032
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1033
# TODO: instead of using _find_present_inventory_keys, change the
1034
# code paths to allow missing inventories to be tolerated.
1035
# However, we only want to tolerate missing parent
1036
# inventories, not missing inventories for revision_ids
1037
present_parent_inv_keys = self._find_present_inventory_keys(
1039
present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1040
inventories_to_read = set(revision_ids)
1041
inventories_to_read.update(present_parent_inv_ids)
1042
root_key_info = _build_interesting_key_sets(
1043
self, inventories_to_read, present_parent_inv_ids)
1044
interesting_root_keys = root_key_info.interesting_root_keys
1045
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1046
chk_bytes = self.chk_bytes
1047
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1048
interesting_root_keys, uninteresting_root_keys,
1050
for name, bytes in items:
1051
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1052
# TODO: consider interning file_id, revision_id here, or
1053
# pushing that intern() into bytes_to_info()
1054
# TODO: rich_root should always be True here, for all
1055
# repositories that support chk_bytes
1056
if not rich_root and name_utf8 == '':
1059
file_id_revisions[file_id].add(revision_id)
1061
file_id_revisions[file_id] = {revision_id}
1064
return file_id_revisions
1066
def find_text_key_references(self):
1067
"""Find the text key references within the repository.
1069
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1070
to whether they were referred to by the inventory of the
1071
revision_id that they contain. The inventory texts from all present
1072
revision ids are assessed to generate this report.
1074
# XXX: Slow version but correct: rewrite as a series of delta
1075
# examinations/direct tree traversal. Note that that will require care
1076
# as a common node is reachable both from the inventory that added it,
1077
# and others afterwards.
1078
revision_keys = self.revisions.keys()
1080
rich_roots = self.supports_rich_root()
1081
pb = ui.ui_factory.nested_progress_bar()
1083
all_revs = self.all_revision_ids()
1084
total = len(all_revs)
1085
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1086
pb.update("Finding text references", pos, total)
1087
for _, entry in inv.iter_entries():
1088
if not rich_roots and entry.file_id == inv.root_id:
1090
key = (entry.file_id, entry.revision)
1091
result.setdefault(key, False)
1092
if entry.revision == inv.revision_id:
1099
def reconcile_canonicalize_chks(self):
1100
"""Reconcile this repository to make sure all CHKs are in canonical
1103
from breezy.reconcile import PackReconciler
1104
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1105
reconciler.reconcile()
1108
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1109
packer = GCCHKReconcilePacker(collection, packs, extension)
1110
return packer.pack(pb)
1112
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1113
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1114
return packer.pack(pb)
1116
def _get_source(self, to_format):
1117
"""Return a source for streaming from this repository."""
1118
if self._format._serializer == to_format._serializer:
1119
# We must be exactly the same format, otherwise stuff like the chk
1120
# page layout might be different.
1121
# Actually, this test is just slightly looser than exact so that
1122
# CHK2 <-> 2a transfers will work.
1123
return GroupCHKStreamSource(self, to_format)
1124
return super(CHKInventoryRepository, self)._get_source(to_format)
1126
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1127
"""Find revisions with different parent lists in the revision object
1128
and in the index graph.
1130
:param revisions_iterator: None, or an iterator of (revid,
1131
Revision-or-None). This iterator controls the revisions checked.
1132
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1133
parents-in-revision).
1135
if not self.is_locked():
1136
raise AssertionError()
1138
if revisions_iterator is None:
1139
revisions_iterator = self._iter_revisions(None)
1140
for revid, revision in revisions_iterator:
1141
if revision is None:
1143
parent_map = vf.get_parent_map([(revid,)])
1144
parents_according_to_index = tuple(parent[-1] for parent in
1145
parent_map[(revid,)])
1146
parents_according_to_revision = tuple(revision.parent_ids)
1147
if parents_according_to_index != parents_according_to_revision:
1148
yield (revid, parents_according_to_index,
1149
parents_according_to_revision)
1151
def _check_for_inconsistent_revision_parents(self):
1152
inconsistencies = list(self._find_inconsistent_revision_parents())
1154
raise errors.BzrCheckError(
1155
"Revision index has inconsistent parents.")
1158
class GroupCHKStreamSource(StreamSource):
1159
"""Used when both the source and target repo are GroupCHK repos."""
1161
def __init__(self, from_repository, to_format):
1162
"""Create a StreamSource streaming from from_repository."""
1163
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1164
self._revision_keys = None
1165
self._text_keys = None
1166
self._text_fetch_order = 'groupcompress'
1167
self._chk_id_roots = None
1168
self._chk_p_id_roots = None
1170
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1171
"""Get a stream of inventory texts.
1173
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1174
should be populated.
1176
self._chk_id_roots = []
1177
self._chk_p_id_roots = []
1178
def _filtered_inv_stream():
1179
id_roots_set = set()
1180
p_id_roots_set = set()
1181
source_vf = self.from_repository.inventories
1182
stream = source_vf.get_record_stream(inventory_keys,
1183
'groupcompress', True)
1184
for record in stream:
1185
if record.storage_kind == 'absent':
1189
raise errors.NoSuchRevision(self, record.key)
1190
bytes = record.get_bytes_as('fulltext')
1191
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1193
key = chk_inv.id_to_entry.key()
1194
if key not in id_roots_set:
1195
self._chk_id_roots.append(key)
1196
id_roots_set.add(key)
1197
p_id_map = chk_inv.parent_id_basename_to_file_id
1198
if p_id_map is None:
1199
raise AssertionError('Parent id -> file_id map not set')
1200
key = p_id_map.key()
1201
if key not in p_id_roots_set:
1202
p_id_roots_set.add(key)
1203
self._chk_p_id_roots.append(key)
1205
# We have finished processing all of the inventory records, we
1206
# don't need these sets anymore
1207
id_roots_set.clear()
1208
p_id_roots_set.clear()
1209
return ('inventories', _filtered_inv_stream())
1211
def _get_filtered_chk_streams(self, excluded_revision_keys):
1212
self._text_keys = set()
1213
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1214
if not excluded_revision_keys:
1215
uninteresting_root_keys = set()
1216
uninteresting_pid_root_keys = set()
1218
# filter out any excluded revisions whose inventories are not
1220
# TODO: Update Repository.iter_inventories() to add
1221
# ignore_missing=True
1222
present_keys = self.from_repository._find_present_inventory_keys(
1223
excluded_revision_keys)
1224
present_ids = [k[-1] for k in present_keys]
1225
uninteresting_root_keys = set()
1226
uninteresting_pid_root_keys = set()
1227
for inv in self.from_repository.iter_inventories(present_ids):
1228
uninteresting_root_keys.add(inv.id_to_entry.key())
1229
uninteresting_pid_root_keys.add(
1230
inv.parent_id_basename_to_file_id.key())
1231
chk_bytes = self.from_repository.chk_bytes
1232
def _filter_id_to_entry():
1233
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1234
self._chk_id_roots, uninteresting_root_keys)
1235
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1236
chk_map._bytes_to_text_key):
1237
if record is not None:
1240
self._chk_id_roots = None
1241
yield 'chk_bytes', _filter_id_to_entry()
1242
def _get_parent_id_basename_to_file_id_pages():
1243
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1244
self._chk_p_id_roots, uninteresting_pid_root_keys):
1245
if record is not None:
1248
self._chk_p_id_roots = None
1249
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1251
def _get_text_stream(self):
1252
# Note: We know we don't have to handle adding root keys, because both
1253
# the source and target are the identical network name.
1254
text_stream = self.from_repository.texts.get_record_stream(
1255
self._text_keys, self._text_fetch_order, False)
1256
return ('texts', text_stream)
1258
def get_stream(self, search):
1259
def wrap_and_count(pb, rc, stream):
1260
"""Yield records from stream while showing progress."""
1262
for record in stream:
1263
if count == rc.STEP:
1265
pb.update('Estimate', rc.current, rc.max)
1270
revision_ids = search.get_keys()
1271
pb = ui.ui_factory.nested_progress_bar()
1272
rc = self._record_counter
1273
self._record_counter.setup(len(revision_ids))
1274
for stream_info in self._fetch_revision_texts(revision_ids):
1275
yield (stream_info[0],
1276
wrap_and_count(pb, rc, stream_info[1]))
1277
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1278
# TODO: The keys to exclude might be part of the search recipe
1279
# For now, exclude all parents that are at the edge of ancestry, for
1280
# which we have inventories
1281
from_repo = self.from_repository
1282
parent_keys = from_repo._find_parent_keys_of_revisions(
1283
self._revision_keys)
1284
self.from_repository.revisions.clear_cache()
1285
self.from_repository.signatures.clear_cache()
1286
# Clear the repo's get_parent_map cache too.
1287
self.from_repository._unstacked_provider.disable_cache()
1288
self.from_repository._unstacked_provider.enable_cache()
1289
s = self._get_inventory_stream(self._revision_keys)
1290
yield (s[0], wrap_and_count(pb, rc, s[1]))
1291
self.from_repository.inventories.clear_cache()
1292
for stream_info in self._get_filtered_chk_streams(parent_keys):
1293
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1294
self.from_repository.chk_bytes.clear_cache()
1295
s = self._get_text_stream()
1296
yield (s[0], wrap_and_count(pb, rc, s[1]))
1297
self.from_repository.texts.clear_cache()
1298
pb.update('Done', rc.max, rc.max)
1301
def get_stream_for_missing_keys(self, missing_keys):
1302
# missing keys can only occur when we are byte copying and not
1303
# translating (because translation means we don't send
1304
# unreconstructable deltas ever).
1305
missing_inventory_keys = set()
1306
for key in missing_keys:
1307
if key[0] != 'inventories':
1308
raise AssertionError('The only missing keys we should'
1309
' be filling in are inventory keys, not %s'
1311
missing_inventory_keys.add(key[1:])
1312
if self._chk_id_roots or self._chk_p_id_roots:
1313
raise AssertionError('Cannot call get_stream_for_missing_keys'
1314
' until all of get_stream() has been consumed.')
1315
# Yield the inventory stream, so we can find the chk stream
1316
# Some of the missing_keys will be missing because they are ghosts.
1317
# As such, we can ignore them. The Sink is required to verify there are
1318
# no unavailable texts when the ghost inventories are not filled in.
1319
yield self._get_inventory_stream(missing_inventory_keys,
1321
# We use the empty set for excluded_revision_keys, to make it clear
1322
# that we want to transmit all referenced chk pages.
1323
for stream_info in self._get_filtered_chk_streams(set()):
1327
class _InterestingKeyInfo(object):
1329
self.interesting_root_keys = set()
1330
self.interesting_pid_root_keys = set()
1331
self.uninteresting_root_keys = set()
1332
self.uninteresting_pid_root_keys = set()
1334
def all_interesting(self):
1335
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1337
def all_uninteresting(self):
1338
return self.uninteresting_root_keys.union(
1339
self.uninteresting_pid_root_keys)
1342
return self.all_interesting().union(self.all_uninteresting())
1345
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1346
result = _InterestingKeyInfo()
1347
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1348
root_key = inv.id_to_entry.key()
1349
pid_root_key = inv.parent_id_basename_to_file_id.key()
1350
if inv.revision_id in parent_only_inv_ids:
1351
result.uninteresting_root_keys.add(root_key)
1352
result.uninteresting_pid_root_keys.add(pid_root_key)
1354
result.interesting_root_keys.add(root_key)
1355
result.interesting_pid_root_keys.add(pid_root_key)
1359
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1360
"""Iterate the result of iter_interesting_nodes, yielding the records
1361
and adding to text_keys.
1363
text_keys_update = text_keys.update
1364
for record, items in interesting_nodes_iterable:
1365
text_keys_update([bytes_to_text_key(b) for n,b in items])
1369
class RepositoryFormat2a(RepositoryFormatPack):
1370
"""A CHK repository that uses the bencode revision serializer."""
1372
repository_class = CHKInventoryRepository
1373
supports_external_lookups = True
1374
supports_chks = True
1375
_commit_builder_class = PackRootCommitBuilder
1376
rich_root_data = True
1377
_serializer = chk_serializer.chk_bencode_serializer
1378
_commit_inv_deltas = True
1379
# What index classes to use
1380
index_builder_class = BTreeBuilder
1381
index_class = BTreeGraphIndex
1382
# Note: We cannot unpack a delta that references a text we haven't
1383
# seen yet. There are 2 options, work in fulltexts, or require
1384
# topological sorting. Using fulltexts is more optimal for local
1385
# operations, because the source can be smart about extracting
1386
# multiple in-a-row (and sharing strings). Topological is better
1387
# for remote, because we access less data.
1388
_fetch_order = 'unordered'
1389
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1391
pack_compresses = True
1393
def _get_matching_bzrdir(self):
1394
return controldir.format_registry.make_bzrdir('2a')
1396
def _ignore_setting_bzrdir(self, format):
1399
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1402
def get_format_string(cls):
1403
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1405
def get_format_description(self):
1406
"""See RepositoryFormat.get_format_description()."""
1407
return ("Repository format 2a - rich roots, group compression"
1408
" and chk inventories")
1411
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1412
"""A 2a repository format that supports nested trees.
1416
def _get_matching_bzrdir(self):
1417
return controldir.format_registry.make_bzrdir('development-subtree')
1419
def _ignore_setting_bzrdir(self, format):
1422
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1425
def get_format_string(cls):
1426
return ('Bazaar development format 8\n')
1428
def get_format_description(self):
1429
"""See RepositoryFormat.get_format_description()."""
1430
return ("Development repository format 8 - nested trees, "
1431
"group compression and chk inventories")
1434
supports_tree_reference = True