1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
19
from __future__ import absolute_import
33
revision as _mod_revision,
38
from ..btree_index import (
42
from ..decorators import needs_write_lock
43
from ..groupcompress import (
45
GroupCompressVersionedFiles,
47
from .pack_repo import (
52
PackRootCommitBuilder,
53
RepositoryPackCollection,
58
from ..vf_repository import (
61
from ..sixish import (
65
from ..static_tuple import StaticTuple
68
class GCPack(NewPack):
70
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
71
"""Create a NewPack instance.
73
:param pack_collection: A PackCollection into which this is being
75
:param upload_suffix: An optional suffix to be given to any temporary
76
files created during the pack creation. e.g '.autopack'
77
:param file_mode: An optional file mode to create the new files with.
79
# replaced from NewPack to:
80
# - change inventory reference list length to 1
81
# - change texts reference lists to 1
82
# TODO: patch this to be parameterised
84
# The relative locations of the packs are constrained, but all are
85
# passed in because the caller has them, so as to avoid object churn.
86
index_builder_class = pack_collection._index_builder_class
88
if pack_collection.chk_index is not None:
89
chk_index = index_builder_class(reference_lists=0)
93
# Revisions: parents list, no text compression.
94
index_builder_class(reference_lists=1),
95
# Inventory: We want to map compression only, but currently the
96
# knit code hasn't been updated enough to understand that, so we
97
# have a regular 2-list index giving parents and compression
99
index_builder_class(reference_lists=1),
100
# Texts: per file graph, for all fileids - so one reference list
101
# and two elements in the key tuple.
102
index_builder_class(reference_lists=1, key_elements=2),
103
# Signatures: Just blobs to store, no compression, no parents
105
index_builder_class(reference_lists=0),
106
# CHK based storage - just blobs, no compression or parents.
109
self._pack_collection = pack_collection
110
# When we make readonly indices, we need this.
111
self.index_class = pack_collection._index_class
112
# where should the new pack be opened
113
self.upload_transport = pack_collection._upload_transport
114
# where are indices written out to
115
self.index_transport = pack_collection._index_transport
116
# where is the pack renamed to when it is finished?
117
self.pack_transport = pack_collection._pack_transport
118
# What file mode to upload the pack and indices with.
119
self._file_mode = file_mode
120
# tracks the content written to the .pack file.
121
self._hash = osutils.md5()
122
# a four-tuple with the length in bytes of the indices, once the pack
123
# is finalised. (rev, inv, text, sigs)
124
self.index_sizes = None
125
# How much data to cache when writing packs. Note that this is not
126
# synchronised with reads, because it's not in the transport layer, so
127
# is not safe unless the client knows it won't be reading from the pack
129
self._cache_limit = 0
130
# the temporary pack file name.
131
self.random_name = osutils.rand_chars(20) + upload_suffix
132
# when was this pack started ?
133
self.start_time = time.time()
134
# open an output stream for the data added to the pack.
135
self.write_stream = self.upload_transport.open_write_stream(
136
self.random_name, mode=self._file_mode)
137
if 'pack' in debug.debug_flags:
138
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
139
time.ctime(), self.upload_transport.base, self.random_name,
140
time.time() - self.start_time)
141
# A list of byte sequences to be written to the new pack, and the
142
# aggregate size of them. Stored as a list rather than separate
143
# variables so that the _write_data closure below can update them.
144
self._buffer = [[], 0]
145
# create a callable for adding data
147
# robertc says- this is a closure rather than a method on the object
148
# so that the variables are locals, and faster than accessing object
150
def _write_data(bytes, flush=False, _buffer=self._buffer,
151
_write=self.write_stream.write, _update=self._hash.update):
152
_buffer[0].append(bytes)
153
_buffer[1] += len(bytes)
155
if _buffer[1] > self._cache_limit or flush:
156
bytes = ''.join(_buffer[0])
160
# expose this on self, for the occasion when clients want to add data.
161
self._write_data = _write_data
162
# a pack writer object to serialise pack records.
163
self._writer = pack.ContainerWriter(self._write_data)
165
# what state is the pack in? (open, finished, aborted)
167
# no name until we finish writing the content
170
def _check_references(self):
171
"""Make sure our external references are present.
173
Packs are allowed to have deltas whose base is not in the pack, but it
174
must be present somewhere in this collection. It is not allowed to
175
have deltas based on a fallback repository.
176
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
178
# Groupcompress packs don't have any external references, arguably CHK
179
# pages have external references, but we cannot 'cheaply' determine
180
# them without actually walking all of the chk pages.
183
class ResumedGCPack(ResumedPack):
185
def _check_references(self):
186
"""Make sure our external compression parents are present."""
187
# See GCPack._check_references for why this is empty
189
def _get_external_refs(self, index):
190
# GC repositories don't have compression parents external to a given
195
class GCCHKPacker(Packer):
196
"""This class understand what it takes to collect a GCCHK repo."""
198
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
200
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
201
revision_ids=revision_ids,
202
reload_func=reload_func)
203
self._pack_collection = pack_collection
204
# ATM, We only support this for GCCHK repositories
205
if pack_collection.chk_index is None:
206
raise AssertionError('pack_collection.chk_index should not be None')
207
self._gather_text_refs = False
208
self._chk_id_roots = []
209
self._chk_p_id_roots = []
210
self._text_refs = None
211
# set by .pack() if self.revision_ids is not None
212
self.revision_keys = None
214
def _get_progress_stream(self, source_vf, keys, message, pb):
216
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
217
for idx, record in enumerate(substream):
219
pb.update(message, idx + 1, len(keys))
223
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
224
"""Filter the texts of inventories, to find the chk pages."""
225
total_keys = len(keys)
226
def _filtered_inv_stream():
228
p_id_roots_set = set()
229
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
230
for idx, record in enumerate(stream):
231
# Inventories should always be with revisions; assume success.
232
bytes = record.get_bytes_as('fulltext')
233
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
236
pb.update('inv', idx, total_keys)
237
key = chk_inv.id_to_entry.key()
238
if key not in id_roots_set:
239
self._chk_id_roots.append(key)
240
id_roots_set.add(key)
241
p_id_map = chk_inv.parent_id_basename_to_file_id
243
raise AssertionError('Parent id -> file_id map not set')
245
if key not in p_id_roots_set:
246
p_id_roots_set.add(key)
247
self._chk_p_id_roots.append(key)
249
# We have finished processing all of the inventory records, we
250
# don't need these sets anymore
252
p_id_roots_set.clear()
253
return _filtered_inv_stream()
255
def _get_chk_streams(self, source_vf, keys, pb=None):
256
# We want to stream the keys from 'id_roots', and things they
257
# reference, and then stream things from p_id_roots and things they
258
# reference, and then any remaining keys that we didn't get to.
260
# We also group referenced texts together, so if one root references a
261
# text with prefix 'a', and another root references a node with prefix
262
# 'a', we want to yield those nodes before we yield the nodes for 'b'
263
# This keeps 'similar' nodes together.
265
# Note: We probably actually want multiple streams here, to help the
266
# client understand that the different levels won't compress well
267
# against each other.
268
# Test the difference between using one Group per level, and
269
# using 1 Group per prefix. (so '' (root) would get a group, then
270
# all the references to search-key 'a' would get a group, etc.)
271
total_keys = len(keys)
272
remaining_keys = set(keys)
274
if self._gather_text_refs:
275
self._text_refs = set()
276
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
279
keys_by_search_prefix = {}
280
remaining_keys.difference_update(cur_keys)
282
def handle_internal_node(node):
283
for prefix, value in viewitems(node._items):
284
# We don't want to request the same key twice, and we
285
# want to order it by the first time it is seen.
286
# Even further, we don't want to request a key which is
287
# not in this group of pack files (it should be in the
288
# repo, but it doesn't have to be in the group being
290
# TODO: consider how to treat externally referenced chk
291
# pages as 'external_references' so that we
292
# always fill them in for stacked branches
293
if value not in next_keys and value in remaining_keys:
294
keys_by_search_prefix.setdefault(prefix,
297
def handle_leaf_node(node):
298
# Store is None, because we know we have a LeafNode, and we
299
# just want its entries
300
for file_id, bytes in node.iteritems(None):
301
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
303
stream = source_vf.get_record_stream(cur_keys,
304
'as-requested', True)
305
for record in stream:
306
if record.storage_kind == 'absent':
307
# An absent CHK record: we assume that the missing
308
# record is in a different pack - e.g. a page not
309
# altered by the commit we're packing.
311
bytes = record.get_bytes_as('fulltext')
312
# We don't care about search_key_func for this code,
313
# because we only care about external references.
314
node = chk_map._deserialise(bytes, record.key,
315
search_key_func=None)
316
common_base = node._search_prefix
317
if isinstance(node, chk_map.InternalNode):
318
handle_internal_node(node)
319
elif parse_leaf_nodes:
320
handle_leaf_node(node)
323
pb.update('chk node', counter[0], total_keys)
326
# Double check that we won't be emitting any keys twice
327
# If we get rid of the pre-calculation of all keys, we could
328
# turn this around and do
329
# next_keys.difference_update(seen_keys)
330
# However, we also may have references to chk pages in another
331
# pack file during autopack. We filter earlier, so we should no
332
# longer need to do this
333
# next_keys = next_keys.intersection(remaining_keys)
335
for prefix in sorted(keys_by_search_prefix):
336
cur_keys.extend(keys_by_search_prefix.pop(prefix))
337
for stream in _get_referenced_stream(self._chk_id_roots,
338
self._gather_text_refs):
340
del self._chk_id_roots
341
# while it isn't really possible for chk_id_roots to not be in the
342
# local group of packs, it is possible that the tree shape has not
343
# changed recently, so we need to filter _chk_p_id_roots by the
345
chk_p_id_roots = [key for key in self._chk_p_id_roots
346
if key in remaining_keys]
347
del self._chk_p_id_roots
348
for stream in _get_referenced_stream(chk_p_id_roots, False):
351
trace.mutter('There were %d keys in the chk index, %d of which'
352
' were not referenced', total_keys,
354
if self.revision_ids is None:
355
stream = source_vf.get_record_stream(remaining_keys,
359
def _build_vf(self, index_name, parents, delta, for_write=False):
360
"""Build a VersionedFiles instance on top of this group of packs."""
361
index_name = index_name + '_index'
363
access = _DirectPackAccess(index_to_pack,
364
reload_func=self._reload_func)
367
if self.new_pack is None:
368
raise AssertionError('No new pack has been set')
369
index = getattr(self.new_pack, index_name)
370
index_to_pack[index] = self.new_pack.access_tuple()
371
index.set_optimize(for_size=True)
372
access.set_writer(self.new_pack._writer, index,
373
self.new_pack.access_tuple())
374
add_callback = index.add_nodes
377
for pack in self.packs:
378
sub_index = getattr(pack, index_name)
379
index_to_pack[sub_index] = pack.access_tuple()
380
indices.append(sub_index)
381
index = _mod_index.CombinedGraphIndex(indices)
383
vf = GroupCompressVersionedFiles(
385
add_callback=add_callback,
387
is_locked=self._pack_collection.repo.is_locked),
392
def _build_vfs(self, index_name, parents, delta):
393
"""Build the source and target VersionedFiles."""
394
source_vf = self._build_vf(index_name, parents,
395
delta, for_write=False)
396
target_vf = self._build_vf(index_name, parents,
397
delta, for_write=True)
398
return source_vf, target_vf
400
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
402
trace.mutter('repacking %d %s', len(keys), message)
403
self.pb.update('repacking %s' % (message,), pb_offset)
404
child_pb = ui.ui_factory.nested_progress_bar()
406
stream = vf_to_stream(source_vf, keys, message, child_pb)
407
for _ in target_vf._insert_record_stream(stream,
414
def _copy_revision_texts(self):
415
source_vf, target_vf = self._build_vfs('revision', True, False)
416
if not self.revision_keys:
417
# We are doing a full fetch, aka 'pack'
418
self.revision_keys = source_vf.keys()
419
self._copy_stream(source_vf, target_vf, self.revision_keys,
420
'revisions', self._get_progress_stream, 1)
422
def _copy_inventory_texts(self):
423
source_vf, target_vf = self._build_vfs('inventory', True, True)
424
# It is not sufficient to just use self.revision_keys, as stacked
425
# repositories can have more inventories than they have revisions.
426
# One alternative would be to do something with
427
# get_parent_map(self.revision_keys), but that shouldn't be any faster
429
inventory_keys = source_vf.keys()
430
missing_inventories = set(self.revision_keys).difference(inventory_keys)
431
if missing_inventories:
432
# Go back to the original repo, to see if these are really missing
433
# https://bugs.launchpad.net/bzr/+bug/437003
434
# If we are packing a subset of the repo, it is fine to just have
435
# the data in another Pack file, which is not included in this pack
437
inv_index = self._pack_collection.repo.inventories._index
438
pmap = inv_index.get_parent_map(missing_inventories)
439
really_missing = missing_inventories.difference(pmap)
441
missing_inventories = sorted(really_missing)
442
raise ValueError('We are missing inventories for revisions: %s'
443
% (missing_inventories,))
444
self._copy_stream(source_vf, target_vf, inventory_keys,
445
'inventories', self._get_filtered_inv_stream, 2)
447
def _get_chk_vfs_for_copy(self):
448
return self._build_vfs('chk', False, False)
450
def _copy_chk_texts(self):
451
source_vf, target_vf = self._get_chk_vfs_for_copy()
452
# TODO: This is technically spurious... if it is a performance issue,
454
total_keys = source_vf.keys()
455
trace.mutter('repacking chk: %d id_to_entry roots,'
456
' %d p_id_map roots, %d total keys',
457
len(self._chk_id_roots), len(self._chk_p_id_roots),
459
self.pb.update('repacking chk', 3)
460
child_pb = ui.ui_factory.nested_progress_bar()
462
for stream in self._get_chk_streams(source_vf, total_keys,
464
for _ in target_vf._insert_record_stream(stream,
471
def _copy_text_texts(self):
472
source_vf, target_vf = self._build_vfs('text', True, True)
473
# XXX: We don't walk the chk map to determine referenced (file_id,
474
# revision_id) keys. We don't do it yet because you really need
475
# to filter out the ones that are present in the parents of the
476
# rev just before the ones you are copying, otherwise the filter
477
# is grabbing too many keys...
478
text_keys = source_vf.keys()
479
self._copy_stream(source_vf, target_vf, text_keys,
480
'texts', self._get_progress_stream, 4)
482
def _copy_signature_texts(self):
483
source_vf, target_vf = self._build_vfs('signature', False, False)
484
signature_keys = source_vf.keys()
485
signature_keys.intersection(self.revision_keys)
486
self._copy_stream(source_vf, target_vf, signature_keys,
487
'signatures', self._get_progress_stream, 5)
489
def _create_pack_from_packs(self):
490
self.pb.update('repacking', 0, 7)
491
self.new_pack = self.open_pack()
492
# Is this necessary for GC ?
493
self.new_pack.set_write_cache_size(1024*1024)
494
self._copy_revision_texts()
495
self._copy_inventory_texts()
496
self._copy_chk_texts()
497
self._copy_text_texts()
498
self._copy_signature_texts()
499
self.new_pack._check_references()
500
if not self._use_pack(self.new_pack):
501
self.new_pack.abort()
503
self.new_pack.finish_content()
504
if len(self.packs) == 1:
505
old_pack = self.packs[0]
506
if old_pack.name == self.new_pack._hash.hexdigest():
507
# The single old pack was already optimally packed.
508
trace.mutter('single pack %s was already optimally packed',
510
self.new_pack.abort()
512
self.pb.update('finishing repack', 6, 7)
513
self.new_pack.finish()
514
self._pack_collection.allocate(self.new_pack)
518
class GCCHKReconcilePacker(GCCHKPacker):
519
"""A packer which regenerates indices etc as it copies.
521
This is used by ``brz reconcile`` to cause parent text pointers to be
525
def __init__(self, *args, **kwargs):
526
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
527
self._data_changed = False
528
self._gather_text_refs = True
530
def _copy_inventory_texts(self):
531
source_vf, target_vf = self._build_vfs('inventory', True, True)
532
self._copy_stream(source_vf, target_vf, self.revision_keys,
533
'inventories', self._get_filtered_inv_stream, 2)
534
if source_vf.keys() != self.revision_keys:
535
self._data_changed = True
537
def _copy_text_texts(self):
538
"""generate what texts we should have and then copy."""
539
source_vf, target_vf = self._build_vfs('text', True, True)
540
trace.mutter('repacking %d texts', len(self._text_refs))
541
self.pb.update("repacking texts", 4)
542
# we have three major tasks here:
543
# 1) generate the ideal index
544
repo = self._pack_collection.repo
545
# We want the one we just wrote, so base it on self.new_pack
546
revision_vf = self._build_vf('revision', True, False, for_write=True)
547
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
548
# Strip keys back into revision_ids.
549
ancestors = dict((k[0], tuple([p[0] for p in parents]))
550
for k, parents in viewitems(ancestor_keys))
552
# TODO: _generate_text_key_index should be much cheaper to generate from
553
# a chk repository, rather than the current implementation
554
ideal_index = repo._generate_text_key_index(None, ancestors)
555
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
556
# 2) generate a keys list that contains all the entries that can
557
# be used as-is, with corrected parents.
559
new_parent_keys = {} # (key, parent_keys)
561
NULL_REVISION = _mod_revision.NULL_REVISION
562
for key in self._text_refs:
568
ideal_parents = tuple(ideal_index[key])
570
discarded_keys.append(key)
571
self._data_changed = True
573
if ideal_parents == (NULL_REVISION,):
575
source_parents = file_id_parent_map[key]
576
if ideal_parents == source_parents:
580
# We need to change the parent graph, but we don't need to
581
# re-insert the text (since we don't pun the compression
582
# parent with the parents list)
583
self._data_changed = True
584
new_parent_keys[key] = ideal_parents
585
# we're finished with some data.
587
del file_id_parent_map
588
# 3) bulk copy the data, updating records than need it
589
def _update_parents_for_texts():
590
stream = source_vf.get_record_stream(self._text_refs,
591
'groupcompress', False)
592
for record in stream:
593
if record.key in new_parent_keys:
594
record.parents = new_parent_keys[record.key]
596
target_vf.insert_record_stream(_update_parents_for_texts())
598
def _use_pack(self, new_pack):
599
"""Override _use_pack to check for reconcile having changed content."""
600
return new_pack.data_inserted() and self._data_changed
603
class GCCHKCanonicalizingPacker(GCCHKPacker):
604
"""A packer that ensures inventories have canonical-form CHK maps.
606
Ideally this would be part of reconcile, but it's very slow and rarely
607
needed. (It repairs repositories affected by
608
https://bugs.launchpad.net/bzr/+bug/522637).
611
def __init__(self, *args, **kwargs):
612
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
613
self._data_changed = False
615
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
616
"""Create and exhaust a stream, but don't insert it.
618
This is useful to get the side-effects of generating a stream.
620
self.pb.update('scanning %s' % (message,), pb_offset)
621
child_pb = ui.ui_factory.nested_progress_bar()
623
list(vf_to_stream(source_vf, keys, message, child_pb))
627
def _copy_inventory_texts(self):
628
source_vf, target_vf = self._build_vfs('inventory', True, True)
629
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
630
inventory_keys = source_vf.keys()
631
# First, copy the existing CHKs on the assumption that most of them
632
# will be correct. This will save us from having to reinsert (and
633
# recompress) these records later at the cost of perhaps preserving a
635
# (Iterate but don't insert _get_filtered_inv_stream to populate the
636
# variables needed by GCCHKPacker._copy_chk_texts.)
637
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
638
self._get_filtered_inv_stream, 2)
639
GCCHKPacker._copy_chk_texts(self)
640
# Now copy and fix the inventories, and any regenerated CHKs.
641
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
642
return self._get_filtered_canonicalizing_inv_stream(
643
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
644
self._copy_stream(source_vf, target_vf, inventory_keys,
645
'inventories', chk_canonicalizing_inv_stream, 4)
647
def _copy_chk_texts(self):
648
# No-op; in this class this happens during _copy_inventory_texts.
651
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
652
pb=None, source_chk_vf=None, target_chk_vf=None):
653
"""Filter the texts of inventories, regenerating CHKs to make sure they
656
total_keys = len(keys)
657
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
658
def _filtered_inv_stream():
659
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
660
search_key_name = None
661
for idx, record in enumerate(stream):
662
# Inventories should always be with revisions; assume success.
663
bytes = record.get_bytes_as('fulltext')
664
chk_inv = inventory.CHKInventory.deserialise(
665
source_chk_vf, bytes, record.key)
667
pb.update('inv', idx, total_keys)
668
chk_inv.id_to_entry._ensure_root()
669
if search_key_name is None:
670
# Find the name corresponding to the search_key_func
671
search_key_reg = chk_map.search_key_registry
672
for search_key_name, func in viewitems(search_key_reg):
673
if func == chk_inv.id_to_entry._search_key_func:
675
canonical_inv = inventory.CHKInventory.from_inventory(
676
target_chk_vf, chk_inv,
677
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
678
search_key_name=search_key_name)
679
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
681
'Non-canonical CHK map for id_to_entry of inv: %s '
682
'(root is %s, should be %s)' % (chk_inv.revision_id,
683
chk_inv.id_to_entry.key()[0],
684
canonical_inv.id_to_entry.key()[0]))
685
self._data_changed = True
686
p_id_map = chk_inv.parent_id_basename_to_file_id
687
p_id_map._ensure_root()
688
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
689
if p_id_map.key() != canon_p_id_map.key():
691
'Non-canonical CHK map for parent_id_to_basename of '
692
'inv: %s (root is %s, should be %s)'
693
% (chk_inv.revision_id, p_id_map.key()[0],
694
canon_p_id_map.key()[0]))
695
self._data_changed = True
696
yield versionedfile.ChunkedContentFactory(record.key,
697
record.parents, record.sha1,
698
canonical_inv.to_lines())
699
# We have finished processing all of the inventory records, we
700
# don't need these sets anymore
701
return _filtered_inv_stream()
703
def _use_pack(self, new_pack):
704
"""Override _use_pack to check for reconcile having changed content."""
705
return new_pack.data_inserted() and self._data_changed
708
class GCRepositoryPackCollection(RepositoryPackCollection):
710
pack_factory = GCPack
711
resumed_pack_factory = ResumedGCPack
712
normal_packer_class = GCCHKPacker
713
optimising_packer_class = GCCHKPacker
715
def _check_new_inventories(self):
716
"""Detect missing inventories or chk root entries for the new revisions
719
:returns: list of strs, summarising any problems found. If the list is
720
empty no problems were found.
722
# Ensure that all revisions added in this write group have:
723
# - corresponding inventories,
724
# - chk root entries for those inventories,
725
# - and any present parent inventories have their chk root
727
# And all this should be independent of any fallback repository.
729
key_deps = self.repo.revisions._index._key_dependencies
730
new_revisions_keys = key_deps.get_new_keys()
731
no_fallback_inv_index = self.repo.inventories._index
732
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
733
no_fallback_texts_index = self.repo.texts._index
734
inv_parent_map = no_fallback_inv_index.get_parent_map(
736
# Are any inventories for corresponding to the new revisions missing?
737
corresponding_invs = set(inv_parent_map)
738
missing_corresponding = set(new_revisions_keys)
739
missing_corresponding.difference_update(corresponding_invs)
740
if missing_corresponding:
741
problems.append("inventories missing for revisions %s" %
742
(sorted(missing_corresponding),))
744
# Are any chk root entries missing for any inventories? This includes
745
# any present parent inventories, which may be used when calculating
746
# deltas for streaming.
747
all_inv_keys = set(corresponding_invs)
748
for parent_inv_keys in viewvalues(inv_parent_map):
749
all_inv_keys.update(parent_inv_keys)
750
# Filter out ghost parents.
751
all_inv_keys.intersection_update(
752
no_fallback_inv_index.get_parent_map(all_inv_keys))
753
parent_invs_only_keys = all_inv_keys.symmetric_difference(
756
inv_ids = [key[-1] for key in all_inv_keys]
757
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
758
root_key_info = _build_interesting_key_sets(
759
self.repo, inv_ids, parent_invs_only_ids)
760
expected_chk_roots = root_key_info.all_keys()
761
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
763
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
764
if missing_chk_roots:
766
"missing referenced chk root keys: %s."
767
"Run 'brz reconcile --canonicalize-chks' on the affected "
769
% (sorted(missing_chk_roots),))
770
# Don't bother checking any further.
772
# Find all interesting chk_bytes records, and make sure they are
773
# present, as well as the text keys they reference.
774
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
775
chk_bytes_no_fallbacks._search_key_func = \
776
self.repo.chk_bytes._search_key_func
777
chk_diff = chk_map.iter_interesting_nodes(
778
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
779
root_key_info.uninteresting_root_keys)
782
for record in _filter_text_keys(chk_diff, text_keys,
783
chk_map._bytes_to_text_key):
785
except errors.NoSuchRevision as e:
786
# XXX: It would be nice if we could give a more precise error here.
787
problems.append("missing chk node(s) for id_to_entry maps")
788
chk_diff = chk_map.iter_interesting_nodes(
789
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
790
root_key_info.uninteresting_pid_root_keys)
792
for interesting_rec, interesting_map in chk_diff:
794
except errors.NoSuchRevision as e:
796
"missing chk node(s) for parent_id_basename_to_file_id maps")
797
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
798
missing_text_keys = text_keys.difference(present_text_keys)
799
if missing_text_keys:
800
problems.append("missing text keys: %r"
801
% (sorted(missing_text_keys),))
805
class CHKInventoryRepository(PackRepository):
806
"""subclass of PackRepository that uses CHK based inventories."""
808
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
810
"""Overridden to change pack collection class."""
811
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
812
control_files, _commit_builder_class, _serializer)
813
index_transport = self._transport.clone('indices')
814
self._pack_collection = GCRepositoryPackCollection(self,
815
self._transport, index_transport,
816
self._transport.clone('upload'),
817
self._transport.clone('packs'),
818
_format.index_builder_class,
820
use_chk_index=self._format.supports_chks,
822
self.inventories = GroupCompressVersionedFiles(
823
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
824
add_callback=self._pack_collection.inventory_index.add_callback,
825
parents=True, is_locked=self.is_locked,
826
inconsistency_fatal=False),
827
access=self._pack_collection.inventory_index.data_access)
828
self.revisions = GroupCompressVersionedFiles(
829
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
830
add_callback=self._pack_collection.revision_index.add_callback,
831
parents=True, is_locked=self.is_locked,
832
track_external_parent_refs=True, track_new_keys=True),
833
access=self._pack_collection.revision_index.data_access,
835
self.signatures = GroupCompressVersionedFiles(
836
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
837
add_callback=self._pack_collection.signature_index.add_callback,
838
parents=False, is_locked=self.is_locked,
839
inconsistency_fatal=False),
840
access=self._pack_collection.signature_index.data_access,
842
self.texts = GroupCompressVersionedFiles(
843
_GCGraphIndex(self._pack_collection.text_index.combined_index,
844
add_callback=self._pack_collection.text_index.add_callback,
845
parents=True, is_locked=self.is_locked,
846
inconsistency_fatal=False),
847
access=self._pack_collection.text_index.data_access)
848
# No parents, individual CHK pages don't have specific ancestry
849
self.chk_bytes = GroupCompressVersionedFiles(
850
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
851
add_callback=self._pack_collection.chk_index.add_callback,
852
parents=False, is_locked=self.is_locked,
853
inconsistency_fatal=False),
854
access=self._pack_collection.chk_index.data_access)
855
search_key_name = self._format._serializer.search_key_name
856
search_key_func = chk_map.search_key_registry.get(search_key_name)
857
self.chk_bytes._search_key_func = search_key_func
858
# True when the repository object is 'write locked' (as opposed to the
859
# physical lock only taken out around changes to the pack-names list.)
860
# Another way to represent this would be a decorator around the control
861
# files object that presents logical locks as physical ones - if this
862
# gets ugly consider that alternative design. RBC 20071011
863
self._write_lock_count = 0
864
self._transaction = None
866
self._reconcile_does_inventory_gc = True
867
self._reconcile_fixes_text_parents = True
868
self._reconcile_backsup_inventory = False
870
def _add_inventory_checked(self, revision_id, inv, parents):
871
"""Add inv to the repository after checking the inputs.
873
This function can be overridden to allow different inventory styles.
875
:seealso: add_inventory, for the contract.
878
serializer = self._format._serializer
879
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
880
maximum_size=serializer.maximum_size,
881
search_key_name=serializer.search_key_name)
882
inv_lines = result.to_lines()
883
return self._inventory_add_lines(revision_id, parents,
884
inv_lines, check_content=False)
886
def _create_inv_from_null(self, delta, revision_id):
887
"""This will mutate new_inv directly.
889
This is a simplified form of create_by_apply_delta which knows that all
890
the old values must be None, so everything is a create.
892
serializer = self._format._serializer
893
new_inv = inventory.CHKInventory(serializer.search_key_name)
894
new_inv.revision_id = revision_id
895
entry_to_bytes = new_inv._entry_to_bytes
896
id_to_entry_dict = {}
897
parent_id_basename_dict = {}
898
for old_path, new_path, file_id, entry in delta:
899
if old_path is not None:
900
raise ValueError('Invalid delta, somebody tried to delete %r'
901
' from the NULL_REVISION'
902
% ((old_path, file_id),))
904
raise ValueError('Invalid delta, delta from NULL_REVISION has'
905
' no new_path %r' % (file_id,))
907
new_inv.root_id = file_id
908
parent_id_basename_key = StaticTuple('', '').intern()
910
utf8_entry_name = entry.name.encode('utf-8')
911
parent_id_basename_key = StaticTuple(entry.parent_id,
912
utf8_entry_name).intern()
913
new_value = entry_to_bytes(entry)
915
# new_inv._path_to_fileid_cache[new_path] = file_id
916
key = StaticTuple(file_id).intern()
917
id_to_entry_dict[key] = new_value
918
parent_id_basename_dict[parent_id_basename_key] = file_id
920
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
921
parent_id_basename_dict, maximum_size=serializer.maximum_size)
924
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
925
parents, basis_inv=None, propagate_caches=False):
926
"""Add a new inventory expressed as a delta against another revision.
928
:param basis_revision_id: The inventory id the delta was created
930
:param delta: The inventory delta (see Inventory.apply_delta for
932
:param new_revision_id: The revision id that the inventory is being
934
:param parents: The revision ids of the parents that revision_id is
935
known to have and are in the repository already. These are supplied
936
for repositories that depend on the inventory graph for revision
937
graph access, as well as for those that pun ancestry with delta
939
:param basis_inv: The basis inventory if it is already known,
941
:param propagate_caches: If True, the caches for this inventory are
942
copied to and updated for the result if possible.
944
:returns: (validator, new_inv)
945
The validator(which is a sha1 digest, though what is sha'd is
946
repository format specific) of the serialized inventory, and the
949
if not self.is_in_write_group():
950
raise AssertionError("%r not in write group" % (self,))
951
_mod_revision.check_not_reserved_id(new_revision_id)
953
if basis_inv is None:
954
if basis_revision_id == _mod_revision.NULL_REVISION:
955
new_inv = self._create_inv_from_null(delta, new_revision_id)
956
if new_inv.root_id is None:
957
raise errors.RootMissing()
958
inv_lines = new_inv.to_lines()
959
return self._inventory_add_lines(new_revision_id, parents,
960
inv_lines, check_content=False), new_inv
962
basis_tree = self.revision_tree(basis_revision_id)
963
basis_tree.lock_read()
964
basis_inv = basis_tree.root_inventory
966
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
967
propagate_caches=propagate_caches)
968
inv_lines = result.to_lines()
969
return self._inventory_add_lines(new_revision_id, parents,
970
inv_lines, check_content=False), result
972
if basis_tree is not None:
975
def _deserialise_inventory(self, revision_id, bytes):
976
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
979
def _iter_inventories(self, revision_ids, ordering):
980
"""Iterate over many inventory objects."""
982
ordering = 'unordered'
983
keys = [(revision_id,) for revision_id in revision_ids]
984
stream = self.inventories.get_record_stream(keys, ordering, True)
986
for record in stream:
987
if record.storage_kind != 'absent':
988
texts[record.key] = record.get_bytes_as('fulltext')
990
texts[record.key] = None
994
yield (None, key[-1])
996
yield (inventory.CHKInventory.deserialise(
997
self.chk_bytes, bytes, key), key[-1])
999
def _get_inventory_xml(self, revision_id):
1000
"""Get serialized inventory as a string."""
1001
# Without a native 'xml' inventory, this method doesn't make sense.
1002
# However older working trees, and older bundles want it - so we supply
1003
# it allowing _get_inventory_xml to work. Bundles currently use the
1004
# serializer directly; this also isn't ideal, but there isn't an xml
1005
# iteration interface offered at all for repositories.
1006
return self._serializer.write_inventory_to_string(
1007
self.get_inventory(revision_id))
1009
def _find_present_inventory_keys(self, revision_keys):
1010
parent_map = self.inventories.get_parent_map(revision_keys)
1011
present_inventory_keys = set(k for k in parent_map)
1012
return present_inventory_keys
1014
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1015
"""Find the file ids and versions affected by revisions.
1017
:param revisions: an iterable containing revision ids.
1018
:param _inv_weave: The inventory weave from this repository or None.
1019
If None, the inventory weave will be opened automatically.
1020
:return: a dictionary mapping altered file-ids to an iterable of
1021
revision_ids. Each altered file-ids has the exact revision_ids that
1022
altered it listed explicitly.
1024
rich_root = self.supports_rich_root()
1025
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1026
file_id_revisions = {}
1027
pb = ui.ui_factory.nested_progress_bar()
1029
revision_keys = [(r,) for r in revision_ids]
1030
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1031
# TODO: instead of using _find_present_inventory_keys, change the
1032
# code paths to allow missing inventories to be tolerated.
1033
# However, we only want to tolerate missing parent
1034
# inventories, not missing inventories for revision_ids
1035
present_parent_inv_keys = self._find_present_inventory_keys(
1037
present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
1038
inventories_to_read = set(revision_ids)
1039
inventories_to_read.update(present_parent_inv_ids)
1040
root_key_info = _build_interesting_key_sets(
1041
self, inventories_to_read, present_parent_inv_ids)
1042
interesting_root_keys = root_key_info.interesting_root_keys
1043
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1044
chk_bytes = self.chk_bytes
1045
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1046
interesting_root_keys, uninteresting_root_keys,
1048
for name, bytes in items:
1049
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1050
# TODO: consider interning file_id, revision_id here, or
1051
# pushing that intern() into bytes_to_info()
1052
# TODO: rich_root should always be True here, for all
1053
# repositories that support chk_bytes
1054
if not rich_root and name_utf8 == '':
1057
file_id_revisions[file_id].add(revision_id)
1059
file_id_revisions[file_id] = {revision_id}
1062
return file_id_revisions
1064
def find_text_key_references(self):
1065
"""Find the text key references within the repository.
1067
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1068
to whether they were referred to by the inventory of the
1069
revision_id that they contain. The inventory texts from all present
1070
revision ids are assessed to generate this report.
1072
# XXX: Slow version but correct: rewrite as a series of delta
1073
# examinations/direct tree traversal. Note that that will require care
1074
# as a common node is reachable both from the inventory that added it,
1075
# and others afterwards.
1076
revision_keys = self.revisions.keys()
1078
rich_roots = self.supports_rich_root()
1079
pb = ui.ui_factory.nested_progress_bar()
1081
all_revs = self.all_revision_ids()
1082
total = len(all_revs)
1083
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1084
pb.update("Finding text references", pos, total)
1085
for _, entry in inv.iter_entries():
1086
if not rich_roots and entry.file_id == inv.root_id:
1088
key = (entry.file_id, entry.revision)
1089
result.setdefault(key, False)
1090
if entry.revision == inv.revision_id:
1097
def reconcile_canonicalize_chks(self):
1098
"""Reconcile this repository to make sure all CHKs are in canonical
1101
from breezy.reconcile import PackReconciler
1102
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1103
reconciler.reconcile()
1106
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1107
packer = GCCHKReconcilePacker(collection, packs, extension)
1108
return packer.pack(pb)
1110
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1111
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1112
return packer.pack(pb)
1114
def _get_source(self, to_format):
1115
"""Return a source for streaming from this repository."""
1116
if self._format._serializer == to_format._serializer:
1117
# We must be exactly the same format, otherwise stuff like the chk
1118
# page layout might be different.
1119
# Actually, this test is just slightly looser than exact so that
1120
# CHK2 <-> 2a transfers will work.
1121
return GroupCHKStreamSource(self, to_format)
1122
return super(CHKInventoryRepository, self)._get_source(to_format)
1124
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1125
"""Find revisions with different parent lists in the revision object
1126
and in the index graph.
1128
:param revisions_iterator: None, or an iterator of (revid,
1129
Revision-or-None). This iterator controls the revisions checked.
1130
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1131
parents-in-revision).
1133
if not self.is_locked():
1134
raise AssertionError()
1136
if revisions_iterator is None:
1137
revisions_iterator = self._iter_revisions(None)
1138
for revid, revision in revisions_iterator:
1139
if revision is None:
1141
parent_map = vf.get_parent_map([(revid,)])
1142
parents_according_to_index = tuple(parent[-1] for parent in
1143
parent_map[(revid,)])
1144
parents_according_to_revision = tuple(revision.parent_ids)
1145
if parents_according_to_index != parents_according_to_revision:
1146
yield (revid, parents_according_to_index,
1147
parents_according_to_revision)
1149
def _check_for_inconsistent_revision_parents(self):
1150
inconsistencies = list(self._find_inconsistent_revision_parents())
1152
raise errors.BzrCheckError(
1153
"Revision index has inconsistent parents.")
1156
class GroupCHKStreamSource(StreamSource):
1157
"""Used when both the source and target repo are GroupCHK repos."""
1159
def __init__(self, from_repository, to_format):
1160
"""Create a StreamSource streaming from from_repository."""
1161
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1162
self._revision_keys = None
1163
self._text_keys = None
1164
self._text_fetch_order = 'groupcompress'
1165
self._chk_id_roots = None
1166
self._chk_p_id_roots = None
1168
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1169
"""Get a stream of inventory texts.
1171
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1172
should be populated.
1174
self._chk_id_roots = []
1175
self._chk_p_id_roots = []
1176
def _filtered_inv_stream():
1177
id_roots_set = set()
1178
p_id_roots_set = set()
1179
source_vf = self.from_repository.inventories
1180
stream = source_vf.get_record_stream(inventory_keys,
1181
'groupcompress', True)
1182
for record in stream:
1183
if record.storage_kind == 'absent':
1187
raise errors.NoSuchRevision(self, record.key)
1188
bytes = record.get_bytes_as('fulltext')
1189
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1191
key = chk_inv.id_to_entry.key()
1192
if key not in id_roots_set:
1193
self._chk_id_roots.append(key)
1194
id_roots_set.add(key)
1195
p_id_map = chk_inv.parent_id_basename_to_file_id
1196
if p_id_map is None:
1197
raise AssertionError('Parent id -> file_id map not set')
1198
key = p_id_map.key()
1199
if key not in p_id_roots_set:
1200
p_id_roots_set.add(key)
1201
self._chk_p_id_roots.append(key)
1203
# We have finished processing all of the inventory records, we
1204
# don't need these sets anymore
1205
id_roots_set.clear()
1206
p_id_roots_set.clear()
1207
return ('inventories', _filtered_inv_stream())
1209
def _get_filtered_chk_streams(self, excluded_revision_keys):
1210
self._text_keys = set()
1211
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1212
if not excluded_revision_keys:
1213
uninteresting_root_keys = set()
1214
uninteresting_pid_root_keys = set()
1216
# filter out any excluded revisions whose inventories are not
1218
# TODO: Update Repository.iter_inventories() to add
1219
# ignore_missing=True
1220
present_keys = self.from_repository._find_present_inventory_keys(
1221
excluded_revision_keys)
1222
present_ids = [k[-1] for k in present_keys]
1223
uninteresting_root_keys = set()
1224
uninteresting_pid_root_keys = set()
1225
for inv in self.from_repository.iter_inventories(present_ids):
1226
uninteresting_root_keys.add(inv.id_to_entry.key())
1227
uninteresting_pid_root_keys.add(
1228
inv.parent_id_basename_to_file_id.key())
1229
chk_bytes = self.from_repository.chk_bytes
1230
def _filter_id_to_entry():
1231
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1232
self._chk_id_roots, uninteresting_root_keys)
1233
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1234
chk_map._bytes_to_text_key):
1235
if record is not None:
1238
self._chk_id_roots = None
1239
yield 'chk_bytes', _filter_id_to_entry()
1240
def _get_parent_id_basename_to_file_id_pages():
1241
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1242
self._chk_p_id_roots, uninteresting_pid_root_keys):
1243
if record is not None:
1246
self._chk_p_id_roots = None
1247
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1249
def _get_text_stream(self):
1250
# Note: We know we don't have to handle adding root keys, because both
1251
# the source and target are the identical network name.
1252
text_stream = self.from_repository.texts.get_record_stream(
1253
self._text_keys, self._text_fetch_order, False)
1254
return ('texts', text_stream)
1256
def get_stream(self, search):
1257
def wrap_and_count(pb, rc, stream):
1258
"""Yield records from stream while showing progress."""
1260
for record in stream:
1261
if count == rc.STEP:
1263
pb.update('Estimate', rc.current, rc.max)
1268
revision_ids = search.get_keys()
1269
pb = ui.ui_factory.nested_progress_bar()
1270
rc = self._record_counter
1271
self._record_counter.setup(len(revision_ids))
1272
for stream_info in self._fetch_revision_texts(revision_ids):
1273
yield (stream_info[0],
1274
wrap_and_count(pb, rc, stream_info[1]))
1275
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1276
# TODO: The keys to exclude might be part of the search recipe
1277
# For now, exclude all parents that are at the edge of ancestry, for
1278
# which we have inventories
1279
from_repo = self.from_repository
1280
parent_keys = from_repo._find_parent_keys_of_revisions(
1281
self._revision_keys)
1282
self.from_repository.revisions.clear_cache()
1283
self.from_repository.signatures.clear_cache()
1284
# Clear the repo's get_parent_map cache too.
1285
self.from_repository._unstacked_provider.disable_cache()
1286
self.from_repository._unstacked_provider.enable_cache()
1287
s = self._get_inventory_stream(self._revision_keys)
1288
yield (s[0], wrap_and_count(pb, rc, s[1]))
1289
self.from_repository.inventories.clear_cache()
1290
for stream_info in self._get_filtered_chk_streams(parent_keys):
1291
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1292
self.from_repository.chk_bytes.clear_cache()
1293
s = self._get_text_stream()
1294
yield (s[0], wrap_and_count(pb, rc, s[1]))
1295
self.from_repository.texts.clear_cache()
1296
pb.update('Done', rc.max, rc.max)
1299
def get_stream_for_missing_keys(self, missing_keys):
1300
# missing keys can only occur when we are byte copying and not
1301
# translating (because translation means we don't send
1302
# unreconstructable deltas ever).
1303
missing_inventory_keys = set()
1304
for key in missing_keys:
1305
if key[0] != 'inventories':
1306
raise AssertionError('The only missing keys we should'
1307
' be filling in are inventory keys, not %s'
1309
missing_inventory_keys.add(key[1:])
1310
if self._chk_id_roots or self._chk_p_id_roots:
1311
raise AssertionError('Cannot call get_stream_for_missing_keys'
1312
' until all of get_stream() has been consumed.')
1313
# Yield the inventory stream, so we can find the chk stream
1314
# Some of the missing_keys will be missing because they are ghosts.
1315
# As such, we can ignore them. The Sink is required to verify there are
1316
# no unavailable texts when the ghost inventories are not filled in.
1317
yield self._get_inventory_stream(missing_inventory_keys,
1319
# We use the empty set for excluded_revision_keys, to make it clear
1320
# that we want to transmit all referenced chk pages.
1321
for stream_info in self._get_filtered_chk_streams(set()):
1325
class _InterestingKeyInfo(object):
1327
self.interesting_root_keys = set()
1328
self.interesting_pid_root_keys = set()
1329
self.uninteresting_root_keys = set()
1330
self.uninteresting_pid_root_keys = set()
1332
def all_interesting(self):
1333
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1335
def all_uninteresting(self):
1336
return self.uninteresting_root_keys.union(
1337
self.uninteresting_pid_root_keys)
1340
return self.all_interesting().union(self.all_uninteresting())
1343
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1344
result = _InterestingKeyInfo()
1345
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1346
root_key = inv.id_to_entry.key()
1347
pid_root_key = inv.parent_id_basename_to_file_id.key()
1348
if inv.revision_id in parent_only_inv_ids:
1349
result.uninteresting_root_keys.add(root_key)
1350
result.uninteresting_pid_root_keys.add(pid_root_key)
1352
result.interesting_root_keys.add(root_key)
1353
result.interesting_pid_root_keys.add(pid_root_key)
1357
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1358
"""Iterate the result of iter_interesting_nodes, yielding the records
1359
and adding to text_keys.
1361
text_keys_update = text_keys.update
1362
for record, items in interesting_nodes_iterable:
1363
text_keys_update([bytes_to_text_key(b) for n,b in items])
1367
class RepositoryFormat2a(RepositoryFormatPack):
1368
"""A CHK repository that uses the bencode revision serializer."""
1370
repository_class = CHKInventoryRepository
1371
supports_external_lookups = True
1372
supports_chks = True
1373
_commit_builder_class = PackRootCommitBuilder
1374
rich_root_data = True
1375
_serializer = chk_serializer.chk_bencode_serializer
1376
_commit_inv_deltas = True
1377
# What index classes to use
1378
index_builder_class = BTreeBuilder
1379
index_class = BTreeGraphIndex
1380
# Note: We cannot unpack a delta that references a text we haven't
1381
# seen yet. There are 2 options, work in fulltexts, or require
1382
# topological sorting. Using fulltexts is more optimal for local
1383
# operations, because the source can be smart about extracting
1384
# multiple in-a-row (and sharing strings). Topological is better
1385
# for remote, because we access less data.
1386
_fetch_order = 'unordered'
1387
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1389
pack_compresses = True
1391
def _get_matching_bzrdir(self):
1392
return controldir.format_registry.make_bzrdir('2a')
1394
def _ignore_setting_bzrdir(self, format):
1397
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1400
def get_format_string(cls):
1401
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1403
def get_format_description(self):
1404
"""See RepositoryFormat.get_format_description()."""
1405
return ("Repository format 2a - rich roots, group compression"
1406
" and chk inventories")
1409
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1410
"""A 2a repository format that supports nested trees.
1414
def _get_matching_bzrdir(self):
1415
return controldir.format_registry.make_bzrdir('development-subtree')
1417
def _ignore_setting_bzrdir(self, format):
1420
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1423
def get_format_string(cls):
1424
return ('Bazaar development format 8\n')
1426
def get_format_description(self):
1427
"""See RepositoryFormat.get_format_description()."""
1428
return ("Development repository format 8 - nested trees, "
1429
"group compression and chk inventories")
1432
supports_tree_reference = True