1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
34
revision as _mod_revision,
38
from bzrlib.btree_index import (
42
from bzrlib.index import GraphIndex, GraphIndexBuilder
43
from bzrlib.groupcompress import (
45
GroupCompressVersionedFiles,
47
from bzrlib.repofmt.pack_repo import (
51
PackRootCommitBuilder,
52
RepositoryPackCollection,
59
class GCPack(NewPack):
61
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
62
"""Create a NewPack instance.
64
:param pack_collection: A PackCollection into which this is being
66
:param upload_suffix: An optional suffix to be given to any temporary
67
files created during the pack creation. e.g '.autopack'
68
:param file_mode: An optional file mode to create the new files with.
70
# replaced from NewPack to:
71
# - change inventory reference list length to 1
72
# - change texts reference lists to 1
73
# TODO: patch this to be parameterised
75
# The relative locations of the packs are constrained, but all are
76
# passed in because the caller has them, so as to avoid object churn.
77
index_builder_class = pack_collection._index_builder_class
79
if pack_collection.chk_index is not None:
80
chk_index = index_builder_class(reference_lists=0)
84
# Revisions: parents list, no text compression.
85
index_builder_class(reference_lists=1),
86
# Inventory: We want to map compression only, but currently the
87
# knit code hasn't been updated enough to understand that, so we
88
# have a regular 2-list index giving parents and compression
90
index_builder_class(reference_lists=1),
91
# Texts: per file graph, for all fileids - so one reference list
92
# and two elements in the key tuple.
93
index_builder_class(reference_lists=1, key_elements=2),
94
# Signatures: Just blobs to store, no compression, no parents
96
index_builder_class(reference_lists=0),
97
# CHK based storage - just blobs, no compression or parents.
100
self._pack_collection = pack_collection
101
# When we make readonly indices, we need this.
102
self.index_class = pack_collection._index_class
103
# where should the new pack be opened
104
self.upload_transport = pack_collection._upload_transport
105
# where are indices written out to
106
self.index_transport = pack_collection._index_transport
107
# where is the pack renamed to when it is finished?
108
self.pack_transport = pack_collection._pack_transport
109
# What file mode to upload the pack and indices with.
110
self._file_mode = file_mode
111
# tracks the content written to the .pack file.
112
self._hash = osutils.md5()
113
# a four-tuple with the length in bytes of the indices, once the pack
114
# is finalised. (rev, inv, text, sigs)
115
self.index_sizes = None
116
# How much data to cache when writing packs. Note that this is not
117
# synchronised with reads, because it's not in the transport layer, so
118
# is not safe unless the client knows it won't be reading from the pack
120
self._cache_limit = 0
121
# the temporary pack file name.
122
self.random_name = osutils.rand_chars(20) + upload_suffix
123
# when was this pack started ?
124
self.start_time = time.time()
125
# open an output stream for the data added to the pack.
126
self.write_stream = self.upload_transport.open_write_stream(
127
self.random_name, mode=self._file_mode)
128
if 'pack' in debug.debug_flags:
129
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
130
time.ctime(), self.upload_transport.base, self.random_name,
131
time.time() - self.start_time)
132
# A list of byte sequences to be written to the new pack, and the
133
# aggregate size of them. Stored as a list rather than separate
134
# variables so that the _write_data closure below can update them.
135
self._buffer = [[], 0]
136
# create a callable for adding data
138
# robertc says- this is a closure rather than a method on the object
139
# so that the variables are locals, and faster than accessing object
141
def _write_data(bytes, flush=False, _buffer=self._buffer,
142
_write=self.write_stream.write, _update=self._hash.update):
143
_buffer[0].append(bytes)
144
_buffer[1] += len(bytes)
146
if _buffer[1] > self._cache_limit or flush:
147
bytes = ''.join(_buffer[0])
151
# expose this on self, for the occasion when clients want to add data.
152
self._write_data = _write_data
153
# a pack writer object to serialise pack records.
154
self._writer = pack.ContainerWriter(self._write_data)
156
# what state is the pack in? (open, finished, aborted)
159
def _check_references(self):
160
"""Make sure our external references are present.
162
Packs are allowed to have deltas whose base is not in the pack, but it
163
must be present somewhere in this collection. It is not allowed to
164
have deltas based on a fallback repository.
165
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
167
# Groupcompress packs don't have any external references, arguably CHK
168
# pages have external references, but we cannot 'cheaply' determine
169
# them without actually walking all of the chk pages.
172
class ResumedGCPack(ResumedPack):
174
def _check_references(self):
175
"""Make sure our external compression parents are present."""
176
# See GCPack._check_references for why this is empty
178
def _get_external_refs(self, index):
179
# GC repositories don't have compression parents external to a given
184
class GCCHKPacker(Packer):
185
"""This class understand what it takes to collect a GCCHK repo."""
187
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
189
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
190
revision_ids=revision_ids,
191
reload_func=reload_func)
192
self._pack_collection = pack_collection
193
# ATM, We only support this for GCCHK repositories
194
if pack_collection.chk_index is None:
195
raise AssertionError('pack_collection.chk_index should not be None')
196
self._gather_text_refs = False
197
self._chk_id_roots = []
198
self._chk_p_id_roots = []
199
self._text_refs = None
200
# set by .pack() if self.revision_ids is not None
201
self.revision_keys = None
203
def _get_progress_stream(self, source_vf, keys, message, pb):
205
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
206
for idx, record in enumerate(substream):
208
pb.update(message, idx + 1, len(keys))
212
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
213
"""Filter the texts of inventories, to find the chk pages."""
214
total_keys = len(keys)
215
def _filtered_inv_stream():
217
p_id_roots_set = set()
218
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
219
for idx, record in enumerate(stream):
220
bytes = record.get_bytes_as('fulltext')
221
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
224
pb.update('inv', idx, total_keys)
225
key = chk_inv.id_to_entry.key()
226
if key not in id_roots_set:
227
self._chk_id_roots.append(key)
228
id_roots_set.add(key)
229
p_id_map = chk_inv.parent_id_basename_to_file_id
231
raise AssertionError('Parent id -> file_id map not set')
233
if key not in p_id_roots_set:
234
p_id_roots_set.add(key)
235
self._chk_p_id_roots.append(key)
237
# We have finished processing all of the inventory records, we
238
# don't need these sets anymore
240
p_id_roots_set.clear()
241
return _filtered_inv_stream()
243
def _get_chk_streams(self, source_vf, keys, pb=None):
244
# We want to stream the keys from 'id_roots', and things they
245
# reference, and then stream things from p_id_roots and things they
246
# reference, and then any remaining keys that we didn't get to.
248
# We also group referenced texts together, so if one root references a
249
# text with prefix 'a', and another root references a node with prefix
250
# 'a', we want to yield those nodes before we yield the nodes for 'b'
251
# This keeps 'similar' nodes together.
253
# Note: We probably actually want multiple streams here, to help the
254
# client understand that the different levels won't compress well
255
# against each other.
256
# Test the difference between using one Group per level, and
257
# using 1 Group per prefix. (so '' (root) would get a group, then
258
# all the references to search-key 'a' would get a group, etc.)
259
total_keys = len(keys)
260
remaining_keys = set(keys)
262
if self._gather_text_refs:
263
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
264
self._text_refs = set()
265
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
268
keys_by_search_prefix = {}
269
remaining_keys.difference_update(cur_keys)
271
def handle_internal_node(node):
272
for prefix, value in node._items.iteritems():
273
# We don't want to request the same key twice, and we
274
# want to order it by the first time it is seen.
275
# Even further, we don't want to request a key which is
276
# not in this group of pack files (it should be in the
277
# repo, but it doesn't have to be in the group being
279
# TODO: consider how to treat externally referenced chk
280
# pages as 'external_references' so that we
281
# always fill them in for stacked branches
282
if value not in next_keys and value in remaining_keys:
283
keys_by_search_prefix.setdefault(prefix,
286
def handle_leaf_node(node):
287
# Store is None, because we know we have a LeafNode, and we
288
# just want its entries
289
for file_id, bytes in node.iteritems(None):
290
name_utf8, file_id, revision_id = bytes_to_info(bytes)
291
self._text_refs.add((file_id, revision_id))
293
stream = source_vf.get_record_stream(cur_keys,
294
'as-requested', True)
295
for record in stream:
296
bytes = record.get_bytes_as('fulltext')
297
# We don't care about search_key_func for this code,
298
# because we only care about external references.
299
node = chk_map._deserialise(bytes, record.key,
300
search_key_func=None)
301
common_base = node._search_prefix
302
if isinstance(node, chk_map.InternalNode):
303
handle_internal_node(node)
304
elif parse_leaf_nodes:
305
handle_leaf_node(node)
308
pb.update('chk node', counter[0], total_keys)
311
# Double check that we won't be emitting any keys twice
312
# If we get rid of the pre-calculation of all keys, we could
313
# turn this around and do
314
# next_keys.difference_update(seen_keys)
315
# However, we also may have references to chk pages in another
316
# pack file during autopack. We filter earlier, so we should no
317
# longer need to do this
318
# next_keys = next_keys.intersection(remaining_keys)
320
for prefix in sorted(keys_by_search_prefix):
321
cur_keys.extend(keys_by_search_prefix.pop(prefix))
322
for stream in _get_referenced_stream(self._chk_id_roots,
323
self._gather_text_refs):
325
del self._chk_id_roots
326
# while it isn't really possible for chk_id_roots to not be in the
327
# local group of packs, it is possible that the tree shape has not
328
# changed recently, so we need to filter _chk_p_id_roots by the
330
chk_p_id_roots = [key for key in self._chk_p_id_roots
331
if key in remaining_keys]
332
del self._chk_p_id_roots
333
for stream in _get_referenced_stream(chk_p_id_roots, False):
336
trace.mutter('There were %d keys in the chk index, %d of which'
337
' were not referenced', total_keys,
339
if self.revision_ids is None:
340
stream = source_vf.get_record_stream(remaining_keys,
344
def _build_vf(self, index_name, parents, delta, for_write=False):
345
"""Build a VersionedFiles instance on top of this group of packs."""
346
index_name = index_name + '_index'
348
access = knit._DirectPackAccess(index_to_pack)
351
if self.new_pack is None:
352
raise AssertionError('No new pack has been set')
353
index = getattr(self.new_pack, index_name)
354
index_to_pack[index] = self.new_pack.access_tuple()
355
index.set_optimize(for_size=True)
356
access.set_writer(self.new_pack._writer, index,
357
self.new_pack.access_tuple())
358
add_callback = index.add_nodes
361
for pack in self.packs:
362
sub_index = getattr(pack, index_name)
363
index_to_pack[sub_index] = pack.access_tuple()
364
indices.append(sub_index)
365
index = _mod_index.CombinedGraphIndex(indices)
367
vf = GroupCompressVersionedFiles(
369
add_callback=add_callback,
371
is_locked=self._pack_collection.repo.is_locked),
376
def _build_vfs(self, index_name, parents, delta):
377
"""Build the source and target VersionedFiles."""
378
source_vf = self._build_vf(index_name, parents,
379
delta, for_write=False)
380
target_vf = self._build_vf(index_name, parents,
381
delta, for_write=True)
382
return source_vf, target_vf
384
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
386
trace.mutter('repacking %d %s', len(keys), message)
387
self.pb.update('repacking %s' % (message,), pb_offset)
388
child_pb = ui.ui_factory.nested_progress_bar()
390
stream = vf_to_stream(source_vf, keys, message, child_pb)
391
for _ in target_vf._insert_record_stream(stream,
398
def _copy_revision_texts(self):
399
source_vf, target_vf = self._build_vfs('revision', True, False)
400
if not self.revision_keys:
401
# We are doing a full fetch, aka 'pack'
402
self.revision_keys = source_vf.keys()
403
self._copy_stream(source_vf, target_vf, self.revision_keys,
404
'revisions', self._get_progress_stream, 1)
406
def _copy_inventory_texts(self):
407
source_vf, target_vf = self._build_vfs('inventory', True, True)
408
self._copy_stream(source_vf, target_vf, self.revision_keys,
409
'inventories', self._get_filtered_inv_stream, 2)
411
def _copy_chk_texts(self):
412
source_vf, target_vf = self._build_vfs('chk', False, False)
413
# TODO: This is technically spurious... if it is a performance issue,
415
total_keys = source_vf.keys()
416
trace.mutter('repacking chk: %d id_to_entry roots,'
417
' %d p_id_map roots, %d total keys',
418
len(self._chk_id_roots), len(self._chk_p_id_roots),
420
self.pb.update('repacking chk', 3)
421
child_pb = ui.ui_factory.nested_progress_bar()
423
for stream in self._get_chk_streams(source_vf, total_keys,
425
for _ in target_vf._insert_record_stream(stream,
432
def _copy_text_texts(self):
433
source_vf, target_vf = self._build_vfs('text', True, True)
434
# XXX: We don't walk the chk map to determine referenced (file_id,
435
# revision_id) keys. We don't do it yet because you really need
436
# to filter out the ones that are present in the parents of the
437
# rev just before the ones you are copying, otherwise the filter
438
# is grabbing too many keys...
439
text_keys = source_vf.keys()
440
self._copy_stream(source_vf, target_vf, text_keys,
441
'text', self._get_progress_stream, 4)
443
def _copy_signature_texts(self):
444
source_vf, target_vf = self._build_vfs('signature', False, False)
445
signature_keys = source_vf.keys()
446
signature_keys.intersection(self.revision_keys)
447
self._copy_stream(source_vf, target_vf, signature_keys,
448
'signatures', self._get_progress_stream, 5)
450
def _create_pack_from_packs(self):
451
self.pb.update('repacking', 0, 7)
452
self.new_pack = self.open_pack()
453
# Is this necessary for GC ?
454
self.new_pack.set_write_cache_size(1024*1024)
455
self._copy_revision_texts()
456
self._copy_inventory_texts()
457
self._copy_chk_texts()
458
self._copy_text_texts()
459
self._copy_signature_texts()
460
self.new_pack._check_references()
461
if not self._use_pack(self.new_pack):
462
self.new_pack.abort()
464
self.pb.update('finishing repack', 6, 7)
465
self.new_pack.finish()
466
self._pack_collection.allocate(self.new_pack)
470
class GCCHKReconcilePacker(GCCHKPacker):
471
"""A packer which regenerates indices etc as it copies.
473
This is used by ``bzr reconcile`` to cause parent text pointers to be
477
def __init__(self, *args, **kwargs):
478
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
479
self._data_changed = False
480
self._gather_text_refs = True
482
def _copy_inventory_texts(self):
483
source_vf, target_vf = self._build_vfs('inventory', True, True)
484
self._copy_stream(source_vf, target_vf, self.revision_keys,
485
'inventories', self._get_filtered_inv_stream, 2)
486
if source_vf.keys() != self.revision_keys:
487
self._data_changed = True
489
def _copy_text_texts(self):
490
"""generate what texts we should have and then copy."""
491
source_vf, target_vf = self._build_vfs('text', True, True)
492
trace.mutter('repacking %d texts', len(self._text_refs))
493
self.pb.update("repacking texts", 4)
494
# we have three major tasks here:
495
# 1) generate the ideal index
496
repo = self._pack_collection.repo
497
# We want the one we just wrote, so base it on self.new_pack
498
revision_vf = self._build_vf('revision', True, False, for_write=True)
499
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
500
# Strip keys back into revision_ids.
501
ancestors = dict((k[0], tuple([p[0] for p in parents]))
502
for k, parents in ancestor_keys.iteritems())
504
# TODO: _generate_text_key_index should be much cheaper to generate from
505
# a chk repository, rather than the current implementation
506
ideal_index = repo._generate_text_key_index(None, ancestors)
507
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
508
# 2) generate a keys list that contains all the entries that can
509
# be used as-is, with corrected parents.
511
new_parent_keys = {} # (key, parent_keys)
513
NULL_REVISION = _mod_revision.NULL_REVISION
514
for key in self._text_refs:
520
ideal_parents = tuple(ideal_index[key])
522
discarded_keys.append(key)
523
self._data_changed = True
525
if ideal_parents == (NULL_REVISION,):
527
source_parents = file_id_parent_map[key]
528
if ideal_parents == source_parents:
532
# We need to change the parent graph, but we don't need to
533
# re-insert the text (since we don't pun the compression
534
# parent with the parents list)
535
self._data_changed = True
536
new_parent_keys[key] = ideal_parents
537
# we're finished with some data.
539
del file_id_parent_map
540
# 3) bulk copy the data, updating records than need it
541
def _update_parents_for_texts():
542
stream = source_vf.get_record_stream(self._text_refs,
543
'groupcompress', False)
544
for record in stream:
545
if record.key in new_parent_keys:
546
record.parents = new_parent_keys[record.key]
548
target_vf.insert_record_stream(_update_parents_for_texts())
550
def _use_pack(self, new_pack):
551
"""Override _use_pack to check for reconcile having changed content."""
552
return new_pack.data_inserted() and self._data_changed
555
class GCRepositoryPackCollection(RepositoryPackCollection):
557
pack_factory = GCPack
558
resumed_pack_factory = ResumedGCPack
560
def _already_packed(self):
561
"""Is the collection already packed?"""
562
# Always repack GC repositories for now
565
def _execute_pack_operations(self, pack_operations,
566
_packer_class=GCCHKPacker,
568
"""Execute a series of pack operations.
570
:param pack_operations: A list of [revision_count, packs_to_combine].
571
:param _packer_class: The class of packer to use (default: Packer).
574
# XXX: Copied across from RepositoryPackCollection simply because we
575
# want to override the _packer_class ... :(
576
for revision_count, packs in pack_operations:
577
# we may have no-ops from the setup logic
580
packer = GCCHKPacker(self, packs, '.autopack',
581
reload_func=reload_func)
584
except errors.RetryWithNewPacks:
585
# An exception is propagating out of this context, make sure
586
# this packer has cleaned up. Packer() doesn't set its new_pack
587
# state into the RepositoryPackCollection object, so we only
588
# have access to it directly here.
589
if packer.new_pack is not None:
590
packer.new_pack.abort()
593
self._remove_pack_from_memory(pack)
594
# record the newly available packs and stop advertising the old
596
self._save_pack_names(clear_obsolete_packs=True)
597
# Move the old packs out of the way now they are no longer referenced.
598
for revision_count, packs in pack_operations:
599
self._obsolete_packs(packs)
602
class CHKInventoryRepository(KnitPackRepository):
603
"""subclass of KnitPackRepository that uses CHK based inventories."""
605
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
607
"""Overridden to change pack collection class."""
608
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
609
_commit_builder_class, _serializer)
610
# and now replace everything it did :)
611
index_transport = self._transport.clone('indices')
612
self._pack_collection = GCRepositoryPackCollection(self,
613
self._transport, index_transport,
614
self._transport.clone('upload'),
615
self._transport.clone('packs'),
616
_format.index_builder_class,
618
use_chk_index=self._format.supports_chks,
620
self.inventories = GroupCompressVersionedFiles(
621
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
622
add_callback=self._pack_collection.inventory_index.add_callback,
623
parents=True, is_locked=self.is_locked),
624
access=self._pack_collection.inventory_index.data_access)
625
self.revisions = GroupCompressVersionedFiles(
626
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
627
add_callback=self._pack_collection.revision_index.add_callback,
628
parents=True, is_locked=self.is_locked,
629
track_external_parent_refs=True),
630
access=self._pack_collection.revision_index.data_access,
632
self.signatures = GroupCompressVersionedFiles(
633
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
634
add_callback=self._pack_collection.signature_index.add_callback,
635
parents=False, is_locked=self.is_locked),
636
access=self._pack_collection.signature_index.data_access,
638
self.texts = GroupCompressVersionedFiles(
639
_GCGraphIndex(self._pack_collection.text_index.combined_index,
640
add_callback=self._pack_collection.text_index.add_callback,
641
parents=True, is_locked=self.is_locked),
642
access=self._pack_collection.text_index.data_access)
643
# No parents, individual CHK pages don't have specific ancestry
644
self.chk_bytes = GroupCompressVersionedFiles(
645
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
646
add_callback=self._pack_collection.chk_index.add_callback,
647
parents=False, is_locked=self.is_locked),
648
access=self._pack_collection.chk_index.data_access)
649
# True when the repository object is 'write locked' (as opposed to the
650
# physical lock only taken out around changes to the pack-names list.)
651
# Another way to represent this would be a decorator around the control
652
# files object that presents logical locks as physical ones - if this
653
# gets ugly consider that alternative design. RBC 20071011
654
self._write_lock_count = 0
655
self._transaction = None
657
self._reconcile_does_inventory_gc = True
658
self._reconcile_fixes_text_parents = True
659
self._reconcile_backsup_inventory = False
661
def _add_inventory_checked(self, revision_id, inv, parents):
662
"""Add inv to the repository after checking the inputs.
664
This function can be overridden to allow different inventory styles.
666
:seealso: add_inventory, for the contract.
669
serializer = self._format._serializer
670
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
671
maximum_size=serializer.maximum_size,
672
search_key_name=serializer.search_key_name)
673
inv_lines = result.to_lines()
674
return self._inventory_add_lines(revision_id, parents,
675
inv_lines, check_content=False)
677
def _get_null_inventory(self):
678
serializer = self._format._serializer
679
null_inv = inventory.CHKInventory(serializer.search_key_name)
680
search_key_func = chk_map.search_key_registry.get(
681
serializer.search_key_name)
682
null_inv.id_to_entry = chk_map.CHKMap(self.chk_bytes,
683
None, search_key_func)
684
null_inv.id_to_entry._root_node.set_maximum_size(
685
serializer.maximum_size)
686
null_inv.parent_id_basename_to_file_id = chk_map.CHKMap(
687
self.chk_bytes, None, search_key_func)
688
null_inv.parent_id_basename_to_file_id._root_node.set_maximum_size(
689
serializer.maximum_size)
690
null_inv.parent_id_basename_to_file_id._root_node._key_width = 2
691
null_inv.root_id = None
694
def _create_inv_from_null(self, delta, new_revision_id):
695
"""This will mutate new_inv directly.
697
This is a simplified form of create_by_apply_delta which knows that all
698
the old values must be None, so everything is a create.
700
serializer = self._format._serializer
701
new_inv = inventory.CHKInventory(serializer.search_key_name)
702
new_inv.revision_id = new_revision_id
704
entry_to_bytes = new_inv._entry_to_bytes
705
id_to_entry_dict = {}
706
parent_id_basename_dict = {}
707
for old_path, new_path, file_id, entry in delta:
708
if old_path is not None:
709
raise ValueError('Invalid delta, somebody tried to delete %r'
710
' from the NULL_REVISION'
711
% ((old_path, file_id),))
713
raise ValueError('Invalid delta, delta from NULL_REVISION has'
714
' no new_path %r' % (file_id,))
717
new_inv.root_id = file_id
718
parent_id_basename_key = '', ''
720
utf8_entry_name = entry.name.encode('utf-8')
721
parent_id_basename_key = (entry.parent_id, utf8_entry_name)
722
new_value = entry_to_bytes(entry)
724
## new_inv._path_to_fileid_cache[new_path] = file_id
725
id_to_entry_dict[(file_id,)] = new_value
726
parent_id_basename_dict[parent_id_basename_key] = file_id
728
search_key_func = chk_map.search_key_registry.get(
729
serializer.search_key_name)
730
maximum_size = serializer.maximum_size
731
root_key = chk_map.CHKMap.from_dict(self.chk_bytes, id_to_entry_dict,
732
maximum_size=maximum_size, key_width=1,
733
search_key_func=search_key_func)
734
new_inv.id_to_entry = chk_map.CHKMap(self.chk_bytes, root_key,
736
root_key = chk_map.CHKMap.from_dict(self.chk_bytes,
737
parent_id_basename_dict,
738
maximum_size=maximum_size, key_width=1,
739
search_key_func=search_key_func)
740
new_inv.parent_id_basename_to_file_id = chk_map.CHKMap(self.chk_bytes,
741
root_key, search_key_func)
744
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
745
parents, basis_inv=None, propagate_caches=False):
746
"""Add a new inventory expressed as a delta against another revision.
748
:param basis_revision_id: The inventory id the delta was created
750
:param delta: The inventory delta (see Inventory.apply_delta for
752
:param new_revision_id: The revision id that the inventory is being
754
:param parents: The revision ids of the parents that revision_id is
755
known to have and are in the repository already. These are supplied
756
for repositories that depend on the inventory graph for revision
757
graph access, as well as for those that pun ancestry with delta
759
:param basis_inv: The basis inventory if it is already known,
761
:param propagate_caches: If True, the caches for this inventory are
762
copied to and updated for the result if possible.
764
:returns: (validator, new_inv)
765
The validator(which is a sha1 digest, though what is sha'd is
766
repository format specific) of the serialized inventory, and the
769
if not self.is_in_write_group():
770
raise AssertionError("%r not in write group" % (self,))
771
_mod_revision.check_not_reserved_id(new_revision_id)
773
if basis_inv is None:
774
if basis_revision_id == _mod_revision.NULL_REVISION:
775
new_inv = self._create_inv_from_null(delta, new_revision_id)
776
inv_lines = new_inv.to_lines()
777
return self._inventory_add_lines(new_revision_id, parents,
778
inv_lines, check_content=False), new_inv
780
basis_tree = self.revision_tree(basis_revision_id)
781
basis_tree.lock_read()
782
basis_inv = basis_tree.inventory
784
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
785
propagate_caches=propagate_caches)
786
inv_lines = result.to_lines()
787
return self._inventory_add_lines(new_revision_id, parents,
788
inv_lines, check_content=False), result
790
if basis_tree is not None:
793
def _iter_inventories(self, revision_ids):
794
"""Iterate over many inventory objects."""
795
keys = [(revision_id,) for revision_id in revision_ids]
796
stream = self.inventories.get_record_stream(keys, 'unordered', True)
798
for record in stream:
799
if record.storage_kind != 'absent':
800
texts[record.key] = record.get_bytes_as('fulltext')
802
raise errors.NoSuchRevision(self, record.key)
804
yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
806
def _iter_inventory_xmls(self, revision_ids):
807
# Without a native 'xml' inventory, this method doesn't make sense, so
808
# make it raise to trap naughty direct users.
809
raise NotImplementedError(self._iter_inventory_xmls)
811
def _find_parent_ids_of_revisions(self, revision_ids):
812
# TODO: we probably want to make this a helper that other code can get
814
parent_map = self.get_parent_map(revision_ids)
816
map(parents.update, parent_map.itervalues())
817
parents.difference_update(revision_ids)
818
parents.discard(_mod_revision.NULL_REVISION)
821
def _find_present_inventory_ids(self, revision_ids):
822
keys = [(r,) for r in revision_ids]
823
parent_map = self.inventories.get_parent_map(keys)
824
present_inventory_ids = set(k[-1] for k in parent_map)
825
return present_inventory_ids
827
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
828
"""Find the file ids and versions affected by revisions.
830
:param revisions: an iterable containing revision ids.
831
:param _inv_weave: The inventory weave from this repository or None.
832
If None, the inventory weave will be opened automatically.
833
:return: a dictionary mapping altered file-ids to an iterable of
834
revision_ids. Each altered file-ids has the exact revision_ids that
835
altered it listed explicitly.
837
rich_root = self.supports_rich_root()
838
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
839
file_id_revisions = {}
840
pb = ui.ui_factory.nested_progress_bar()
842
parent_ids = self._find_parent_ids_of_revisions(revision_ids)
843
present_parent_inv_ids = self._find_present_inventory_ids(parent_ids)
844
uninteresting_root_keys = set()
845
interesting_root_keys = set()
846
inventories_to_read = set(present_parent_inv_ids)
847
inventories_to_read.update(revision_ids)
848
for inv in self.iter_inventories(inventories_to_read):
849
entry_chk_root_key = inv.id_to_entry.key()
850
if inv.revision_id in present_parent_inv_ids:
851
uninteresting_root_keys.add(entry_chk_root_key)
853
interesting_root_keys.add(entry_chk_root_key)
855
chk_bytes = self.chk_bytes
856
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
857
interesting_root_keys, uninteresting_root_keys,
859
for name, bytes in items:
860
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
861
if not rich_root and name_utf8 == '':
864
file_id_revisions[file_id].add(revision_id)
866
file_id_revisions[file_id] = set([revision_id])
869
return file_id_revisions
871
def find_text_key_references(self):
872
"""Find the text key references within the repository.
874
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
875
to whether they were referred to by the inventory of the
876
revision_id that they contain. The inventory texts from all present
877
revision ids are assessed to generate this report.
879
# XXX: Slow version but correct: rewrite as a series of delta
880
# examinations/direct tree traversal. Note that that will require care
881
# as a common node is reachable both from the inventory that added it,
882
# and others afterwards.
883
revision_keys = self.revisions.keys()
885
rich_roots = self.supports_rich_root()
886
pb = ui.ui_factory.nested_progress_bar()
888
all_revs = self.all_revision_ids()
889
total = len(all_revs)
890
for pos, inv in enumerate(self.iter_inventories(all_revs)):
891
pb.update("Finding text references", pos, total)
892
for _, entry in inv.iter_entries():
893
if not rich_roots and entry.file_id == inv.root_id:
895
key = (entry.file_id, entry.revision)
896
result.setdefault(key, False)
897
if entry.revision == inv.revision_id:
903
def _reconcile_pack(self, collection, packs, extension, revs, pb):
904
packer = GCCHKReconcilePacker(collection, packs, extension)
905
return packer.pack(pb)
907
def _get_source(self, to_format):
908
"""Return a source for streaming from this repository."""
909
if isinstance(to_format, remote.RemoteRepositoryFormat):
910
# Can't just check attributes on to_format with the current code,
912
to_format._ensure_real()
913
to_format = to_format._custom_format
914
if to_format.__class__ is self._format.__class__:
915
# We must be exactly the same format, otherwise stuff like the chk
916
# page layout might be different
917
return GroupCHKStreamSource(self, to_format)
918
return super(CHKInventoryRepository, self)._get_source(to_format)
921
class GroupCHKStreamSource(repository.StreamSource):
922
"""Used when both the source and target repo are GroupCHK repos."""
924
def __init__(self, from_repository, to_format):
925
"""Create a StreamSource streaming from from_repository."""
926
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
927
self._revision_keys = None
928
self._text_keys = None
929
self._chk_id_roots = None
930
self._chk_p_id_roots = None
932
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
933
"""Get a stream of inventory texts.
935
When this function returns, self._chk_id_roots and self._chk_p_id_roots
938
self._chk_id_roots = []
939
self._chk_p_id_roots = []
940
def _filtered_inv_stream():
942
p_id_roots_set = set()
943
source_vf = self.from_repository.inventories
944
stream = source_vf.get_record_stream(inventory_keys,
945
'groupcompress', True)
946
for record in stream:
947
if record.storage_kind == 'absent':
951
raise errors.NoSuchRevision(self, record.key)
952
bytes = record.get_bytes_as('fulltext')
953
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
955
key = chk_inv.id_to_entry.key()
956
if key not in id_roots_set:
957
self._chk_id_roots.append(key)
958
id_roots_set.add(key)
959
p_id_map = chk_inv.parent_id_basename_to_file_id
961
raise AssertionError('Parent id -> file_id map not set')
963
if key not in p_id_roots_set:
964
p_id_roots_set.add(key)
965
self._chk_p_id_roots.append(key)
967
# We have finished processing all of the inventory records, we
968
# don't need these sets anymore
970
p_id_roots_set.clear()
971
return ('inventories', _filtered_inv_stream())
973
def _find_present_inventories(self, revision_ids):
974
revision_keys = [(r,) for r in revision_ids]
975
inventories = self.from_repository.inventories
976
present_inventories = inventories.get_parent_map(revision_keys)
977
return [p[-1] for p in present_inventories]
979
def _get_filtered_chk_streams(self, excluded_revision_ids):
980
self._text_keys = set()
981
excluded_revision_ids.discard(_mod_revision.NULL_REVISION)
982
if not excluded_revision_ids:
983
uninteresting_root_keys = set()
984
uninteresting_pid_root_keys = set()
986
# filter out any excluded revisions whose inventories are not
988
# TODO: Update Repository.iter_inventories() to add
989
# ignore_missing=True
990
present_ids = self.from_repository._find_present_inventory_ids(
991
excluded_revision_ids)
992
present_ids = self._find_present_inventories(excluded_revision_ids)
993
uninteresting_root_keys = set()
994
uninteresting_pid_root_keys = set()
995
for inv in self.from_repository.iter_inventories(present_ids):
996
uninteresting_root_keys.add(inv.id_to_entry.key())
997
uninteresting_pid_root_keys.add(
998
inv.parent_id_basename_to_file_id.key())
999
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1000
chk_bytes = self.from_repository.chk_bytes
1001
def _filter_id_to_entry():
1002
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1003
self._chk_id_roots, uninteresting_root_keys):
1004
for name, bytes in items:
1005
# Note: we don't care about name_utf8, because we are always
1007
_, file_id, revision_id = bytes_to_info(bytes)
1008
self._text_keys.add((file_id, revision_id))
1009
if record is not None:
1012
self._chk_id_roots = None
1013
yield 'chk_bytes', _filter_id_to_entry()
1014
def _get_parent_id_basename_to_file_id_pages():
1015
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1016
self._chk_p_id_roots, uninteresting_pid_root_keys):
1017
if record is not None:
1020
self._chk_p_id_roots = None
1021
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1023
def _get_text_stream(self):
1024
# Note: We know we don't have to handle adding root keys, because both
1025
# the source and target are GCCHK, and those always support rich-roots
1026
# We may want to request as 'unordered', in case the source has done a
1028
return ('texts', self.from_repository.texts.get_record_stream(
1029
self._text_keys, 'groupcompress', False))
1031
def get_stream(self, search):
1032
revision_ids = search.get_keys()
1033
for stream_info in self._fetch_revision_texts(revision_ids):
1035
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1036
yield self._get_inventory_stream(self._revision_keys)
1037
# TODO: The keys to exclude might be part of the search recipe
1038
# For now, exclude all parents that are at the edge of ancestry, for
1039
# which we have inventories
1040
from_repo = self.from_repository
1041
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
1042
for stream_info in self._get_filtered_chk_streams(parent_ids):
1044
yield self._get_text_stream()
1046
def get_stream_for_missing_keys(self, missing_keys):
1047
# missing keys can only occur when we are byte copying and not
1048
# translating (because translation means we don't send
1049
# unreconstructable deltas ever).
1050
missing_inventory_keys = set()
1051
for key in missing_keys:
1052
if key[0] != 'inventories':
1053
raise AssertionError('The only missing keys we should'
1054
' be filling in are inventory keys, not %s'
1056
missing_inventory_keys.add(key[1:])
1057
if self._chk_id_roots or self._chk_p_id_roots:
1058
raise AssertionError('Cannot call get_stream_for_missing_keys'
1059
' untill all of get_stream() has been consumed.')
1060
# Yield the inventory stream, so we can find the chk stream
1061
# Some of the missing_keys will be missing because they are ghosts.
1062
# As such, we can ignore them. The Sink is required to verify there are
1063
# no unavailable texts when the ghost inventories are not filled in.
1064
yield self._get_inventory_stream(missing_inventory_keys,
1066
# We use the empty set for excluded_revision_ids, to make it clear that
1067
# we want to transmit all referenced chk pages.
1068
for stream_info in self._get_filtered_chk_streams(set()):
1072
class RepositoryFormatCHK1(RepositoryFormatPack):
1073
"""A hashed CHK+group compress pack repository."""
1075
repository_class = CHKInventoryRepository
1076
supports_external_lookups = True
1077
supports_chks = True
1078
# For right now, setting this to True gives us InterModel1And2 rather
1079
# than InterDifferingSerializer
1080
_commit_builder_class = PackRootCommitBuilder
1081
rich_root_data = True
1082
_serializer = chk_serializer.chk_serializer_255_bigpage
1083
_commit_inv_deltas = True
1084
# What index classes to use
1085
index_builder_class = BTreeBuilder
1086
index_class = BTreeGraphIndex
1087
# Note: We cannot unpack a delta that references a text we haven't
1088
# seen yet. There are 2 options, work in fulltexts, or require
1089
# topological sorting. Using fulltexts is more optimal for local
1090
# operations, because the source can be smart about extracting
1091
# multiple in-a-row (and sharing strings). Topological is better
1092
# for remote, because we access less data.
1093
_fetch_order = 'unordered'
1094
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1097
def _get_matching_bzrdir(self):
1098
return bzrdir.format_registry.make_bzrdir('development6-rich-root')
1100
def _ignore_setting_bzrdir(self, format):
1103
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1105
def get_format_string(self):
1106
"""See RepositoryFormat.get_format_string()."""
1107
return ('Bazaar development format - group compression and chk inventory'
1108
' (needs bzr.dev from 1.14)\n')
1110
def get_format_description(self):
1111
"""See RepositoryFormat.get_format_description()."""
1112
return ("Development repository format - rich roots, group compression"
1113
" and chk inventories")
1115
def check_conversion_target(self, target_format):
1116
if not target_format.rich_root_data:
1117
raise errors.BadConversionTarget(
1118
'Does not support rich root data.', target_format)
1119
if not getattr(target_format, 'supports_tree_reference', False):
1120
raise errors.BadConversionTarget(
1121
'Does not support nested trees', target_format)
1125
class RepositoryFormatCHK2(RepositoryFormatCHK1):
1126
"""A CHK repository that uses the bencode revision serializer."""
1128
_serializer = chk_serializer.chk_bencode_serializer
1130
def _get_matching_bzrdir(self):
1131
return bzrdir.format_registry.make_bzrdir('development7-rich-root')
1133
def _ignore_setting_bzrdir(self, format):
1136
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1138
def get_format_string(self):
1139
"""See RepositoryFormat.get_format_string()."""
1140
return ('Bazaar development format - chk repository with bencode '
1141
'revision serialization (needs bzr.dev from 1.15)\n')