/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/repofmt/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Repository formats using CHK inventories and groupcompress compression."""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
import time
 
22
 
 
23
from .. import (
 
24
    controldir,
 
25
    debug,
 
26
    errors,
 
27
    osutils,
 
28
    revision as _mod_revision,
 
29
    trace,
 
30
    ui,
 
31
    )
 
32
from ..bzr import (
 
33
    chk_map,
 
34
    chk_serializer,
 
35
    index as _mod_index,
 
36
    inventory,
 
37
    pack,
 
38
    versionedfile,
 
39
    )
 
40
from ..bzr.btree_index import (
 
41
    BTreeGraphIndex,
 
42
    BTreeBuilder,
 
43
    )
 
44
from ..decorators import needs_write_lock
 
45
from ..bzr.groupcompress import (
 
46
    _GCGraphIndex,
 
47
    GroupCompressVersionedFiles,
 
48
    )
 
49
from .pack_repo import (
 
50
    _DirectPackAccess,
 
51
    Pack,
 
52
    NewPack,
 
53
    PackRepository,
 
54
    PackRootCommitBuilder,
 
55
    RepositoryPackCollection,
 
56
    RepositoryFormatPack,
 
57
    ResumedPack,
 
58
    Packer,
 
59
    )
 
60
from ..bzr.vf_repository import (
 
61
    StreamSource,
 
62
    )
 
63
from ..sixish import (
 
64
    viewitems,
 
65
    viewvalues,
 
66
    )
 
67
from ..static_tuple import StaticTuple
 
68
 
 
69
 
 
70
class GCPack(NewPack):
 
71
 
 
72
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
 
73
        """Create a NewPack instance.
 
74
 
 
75
        :param pack_collection: A PackCollection into which this is being
 
76
            inserted.
 
77
        :param upload_suffix: An optional suffix to be given to any temporary
 
78
            files created during the pack creation. e.g '.autopack'
 
79
        :param file_mode: An optional file mode to create the new files with.
 
80
        """
 
81
        # replaced from NewPack to:
 
82
        # - change inventory reference list length to 1
 
83
        # - change texts reference lists to 1
 
84
        # TODO: patch this to be parameterised
 
85
 
 
86
        # The relative locations of the packs are constrained, but all are
 
87
        # passed in because the caller has them, so as to avoid object churn.
 
88
        index_builder_class = pack_collection._index_builder_class
 
89
        # from brisbane-core
 
90
        if pack_collection.chk_index is not None:
 
91
            chk_index = index_builder_class(reference_lists=0)
 
92
        else:
 
93
            chk_index = None
 
94
        Pack.__init__(self,
 
95
            # Revisions: parents list, no text compression.
 
96
            index_builder_class(reference_lists=1),
 
97
            # Inventory: We want to map compression only, but currently the
 
98
            # knit code hasn't been updated enough to understand that, so we
 
99
            # have a regular 2-list index giving parents and compression
 
100
            # source.
 
101
            index_builder_class(reference_lists=1),
 
102
            # Texts: per file graph, for all fileids - so one reference list
 
103
            # and two elements in the key tuple.
 
104
            index_builder_class(reference_lists=1, key_elements=2),
 
105
            # Signatures: Just blobs to store, no compression, no parents
 
106
            # listing.
 
107
            index_builder_class(reference_lists=0),
 
108
            # CHK based storage - just blobs, no compression or parents.
 
109
            chk_index=chk_index
 
110
            )
 
111
        self._pack_collection = pack_collection
 
112
        # When we make readonly indices, we need this.
 
113
        self.index_class = pack_collection._index_class
 
114
        # where should the new pack be opened
 
115
        self.upload_transport = pack_collection._upload_transport
 
116
        # where are indices written out to
 
117
        self.index_transport = pack_collection._index_transport
 
118
        # where is the pack renamed to when it is finished?
 
119
        self.pack_transport = pack_collection._pack_transport
 
120
        # What file mode to upload the pack and indices with.
 
121
        self._file_mode = file_mode
 
122
        # tracks the content written to the .pack file.
 
123
        self._hash = osutils.md5()
 
124
        # a four-tuple with the length in bytes of the indices, once the pack
 
125
        # is finalised. (rev, inv, text, sigs)
 
126
        self.index_sizes = None
 
127
        # How much data to cache when writing packs. Note that this is not
 
128
        # synchronised with reads, because it's not in the transport layer, so
 
129
        # is not safe unless the client knows it won't be reading from the pack
 
130
        # under creation.
 
131
        self._cache_limit = 0
 
132
        # the temporary pack file name.
 
133
        self.random_name = osutils.rand_chars(20) + upload_suffix
 
134
        # when was this pack started ?
 
135
        self.start_time = time.time()
 
136
        # open an output stream for the data added to the pack.
 
137
        self.write_stream = self.upload_transport.open_write_stream(
 
138
            self.random_name, mode=self._file_mode)
 
139
        if 'pack' in debug.debug_flags:
 
140
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
 
141
                time.ctime(), self.upload_transport.base, self.random_name,
 
142
                time.time() - self.start_time)
 
143
        # A list of byte sequences to be written to the new pack, and the
 
144
        # aggregate size of them.  Stored as a list rather than separate
 
145
        # variables so that the _write_data closure below can update them.
 
146
        self._buffer = [[], 0]
 
147
        # create a callable for adding data
 
148
        #
 
149
        # robertc says- this is a closure rather than a method on the object
 
150
        # so that the variables are locals, and faster than accessing object
 
151
        # members.
 
152
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
153
            _write=self.write_stream.write, _update=self._hash.update):
 
154
            _buffer[0].append(bytes)
 
155
            _buffer[1] += len(bytes)
 
156
            # buffer cap
 
157
            if _buffer[1] > self._cache_limit or flush:
 
158
                bytes = ''.join(_buffer[0])
 
159
                _write(bytes)
 
160
                _update(bytes)
 
161
                _buffer[:] = [[], 0]
 
162
        # expose this on self, for the occasion when clients want to add data.
 
163
        self._write_data = _write_data
 
164
        # a pack writer object to serialise pack records.
 
165
        self._writer = pack.ContainerWriter(self._write_data)
 
166
        self._writer.begin()
 
167
        # what state is the pack in? (open, finished, aborted)
 
168
        self._state = 'open'
 
169
        # no name until we finish writing the content
 
170
        self.name = None
 
171
 
 
172
    def _check_references(self):
 
173
        """Make sure our external references are present.
 
174
 
 
175
        Packs are allowed to have deltas whose base is not in the pack, but it
 
176
        must be present somewhere in this collection.  It is not allowed to
 
177
        have deltas based on a fallback repository.
 
178
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
179
        """
 
180
        # Groupcompress packs don't have any external references, arguably CHK
 
181
        # pages have external references, but we cannot 'cheaply' determine
 
182
        # them without actually walking all of the chk pages.
 
183
 
 
184
 
 
185
class ResumedGCPack(ResumedPack):
 
186
 
 
187
    def _check_references(self):
 
188
        """Make sure our external compression parents are present."""
 
189
        # See GCPack._check_references for why this is empty
 
190
 
 
191
    def _get_external_refs(self, index):
 
192
        # GC repositories don't have compression parents external to a given
 
193
        # pack file
 
194
        return set()
 
195
 
 
196
 
 
197
class GCCHKPacker(Packer):
 
198
    """This class understand what it takes to collect a GCCHK repo."""
 
199
 
 
200
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
 
201
                 reload_func=None):
 
202
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
 
203
                                          revision_ids=revision_ids,
 
204
                                          reload_func=reload_func)
 
205
        self._pack_collection = pack_collection
 
206
        # ATM, We only support this for GCCHK repositories
 
207
        if pack_collection.chk_index is None:
 
208
            raise AssertionError('pack_collection.chk_index should not be None')
 
209
        self._gather_text_refs = False
 
210
        self._chk_id_roots = []
 
211
        self._chk_p_id_roots = []
 
212
        self._text_refs = None
 
213
        # set by .pack() if self.revision_ids is not None
 
214
        self.revision_keys = None
 
215
 
 
216
    def _get_progress_stream(self, source_vf, keys, message, pb):
 
217
        def pb_stream():
 
218
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
219
            for idx, record in enumerate(substream):
 
220
                if pb is not None:
 
221
                    pb.update(message, idx + 1, len(keys))
 
222
                yield record
 
223
        return pb_stream()
 
224
 
 
225
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
 
226
        """Filter the texts of inventories, to find the chk pages."""
 
227
        total_keys = len(keys)
 
228
        def _filtered_inv_stream():
 
229
            id_roots_set = set()
 
230
            p_id_roots_set = set()
 
231
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
232
            for idx, record in enumerate(stream):
 
233
                # Inventories should always be with revisions; assume success.
 
234
                bytes = record.get_bytes_as('fulltext')
 
235
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
236
                                                             record.key)
 
237
                if pb is not None:
 
238
                    pb.update('inv', idx, total_keys)
 
239
                key = chk_inv.id_to_entry.key()
 
240
                if key not in id_roots_set:
 
241
                    self._chk_id_roots.append(key)
 
242
                    id_roots_set.add(key)
 
243
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
244
                if p_id_map is None:
 
245
                    raise AssertionError('Parent id -> file_id map not set')
 
246
                key = p_id_map.key()
 
247
                if key not in p_id_roots_set:
 
248
                    p_id_roots_set.add(key)
 
249
                    self._chk_p_id_roots.append(key)
 
250
                yield record
 
251
            # We have finished processing all of the inventory records, we
 
252
            # don't need these sets anymore
 
253
            id_roots_set.clear()
 
254
            p_id_roots_set.clear()
 
255
        return _filtered_inv_stream()
 
256
 
 
257
    def _get_chk_streams(self, source_vf, keys, pb=None):
 
258
        # We want to stream the keys from 'id_roots', and things they
 
259
        # reference, and then stream things from p_id_roots and things they
 
260
        # reference, and then any remaining keys that we didn't get to.
 
261
 
 
262
        # We also group referenced texts together, so if one root references a
 
263
        # text with prefix 'a', and another root references a node with prefix
 
264
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
265
        # This keeps 'similar' nodes together.
 
266
 
 
267
        # Note: We probably actually want multiple streams here, to help the
 
268
        #       client understand that the different levels won't compress well
 
269
        #       against each other.
 
270
        #       Test the difference between using one Group per level, and
 
271
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
272
        #       all the references to search-key 'a' would get a group, etc.)
 
273
        total_keys = len(keys)
 
274
        remaining_keys = set(keys)
 
275
        counter = [0]
 
276
        if self._gather_text_refs:
 
277
            self._text_refs = set()
 
278
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
 
279
            cur_keys = root_keys
 
280
            while cur_keys:
 
281
                keys_by_search_prefix = {}
 
282
                remaining_keys.difference_update(cur_keys)
 
283
                next_keys = set()
 
284
                def handle_internal_node(node):
 
285
                    for prefix, value in viewitems(node._items):
 
286
                        # We don't want to request the same key twice, and we
 
287
                        # want to order it by the first time it is seen.
 
288
                        # Even further, we don't want to request a key which is
 
289
                        # not in this group of pack files (it should be in the
 
290
                        # repo, but it doesn't have to be in the group being
 
291
                        # packed.)
 
292
                        # TODO: consider how to treat externally referenced chk
 
293
                        #       pages as 'external_references' so that we
 
294
                        #       always fill them in for stacked branches
 
295
                        if value not in next_keys and value in remaining_keys:
 
296
                            keys_by_search_prefix.setdefault(prefix,
 
297
                                []).append(value)
 
298
                            next_keys.add(value)
 
299
                def handle_leaf_node(node):
 
300
                    # Store is None, because we know we have a LeafNode, and we
 
301
                    # just want its entries
 
302
                    for file_id, bytes in node.iteritems(None):
 
303
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
 
304
                def next_stream():
 
305
                    stream = source_vf.get_record_stream(cur_keys,
 
306
                                                         'as-requested', True)
 
307
                    for record in stream:
 
308
                        if record.storage_kind == 'absent':
 
309
                            # An absent CHK record: we assume that the missing
 
310
                            # record is in a different pack - e.g. a page not
 
311
                            # altered by the commit we're packing.
 
312
                            continue
 
313
                        bytes = record.get_bytes_as('fulltext')
 
314
                        # We don't care about search_key_func for this code,
 
315
                        # because we only care about external references.
 
316
                        node = chk_map._deserialise(bytes, record.key,
 
317
                                                    search_key_func=None)
 
318
                        common_base = node._search_prefix
 
319
                        if isinstance(node, chk_map.InternalNode):
 
320
                            handle_internal_node(node)
 
321
                        elif parse_leaf_nodes:
 
322
                            handle_leaf_node(node)
 
323
                        counter[0] += 1
 
324
                        if pb is not None:
 
325
                            pb.update('chk node', counter[0], total_keys)
 
326
                        yield record
 
327
                yield next_stream()
 
328
                # Double check that we won't be emitting any keys twice
 
329
                # If we get rid of the pre-calculation of all keys, we could
 
330
                # turn this around and do
 
331
                # next_keys.difference_update(seen_keys)
 
332
                # However, we also may have references to chk pages in another
 
333
                # pack file during autopack. We filter earlier, so we should no
 
334
                # longer need to do this
 
335
                # next_keys = next_keys.intersection(remaining_keys)
 
336
                cur_keys = []
 
337
                for prefix in sorted(keys_by_search_prefix):
 
338
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
 
339
        for stream in _get_referenced_stream(self._chk_id_roots,
 
340
                                             self._gather_text_refs):
 
341
            yield stream
 
342
        del self._chk_id_roots
 
343
        # while it isn't really possible for chk_id_roots to not be in the
 
344
        # local group of packs, it is possible that the tree shape has not
 
345
        # changed recently, so we need to filter _chk_p_id_roots by the
 
346
        # available keys
 
347
        chk_p_id_roots = [key for key in self._chk_p_id_roots
 
348
                          if key in remaining_keys]
 
349
        del self._chk_p_id_roots
 
350
        for stream in _get_referenced_stream(chk_p_id_roots, False):
 
351
            yield stream
 
352
        if remaining_keys:
 
353
            trace.mutter('There were %d keys in the chk index, %d of which'
 
354
                         ' were not referenced', total_keys,
 
355
                         len(remaining_keys))
 
356
            if self.revision_ids is None:
 
357
                stream = source_vf.get_record_stream(remaining_keys,
 
358
                                                     'unordered', True)
 
359
                yield stream
 
360
 
 
361
    def _build_vf(self, index_name, parents, delta, for_write=False):
 
362
        """Build a VersionedFiles instance on top of this group of packs."""
 
363
        index_name = index_name + '_index'
 
364
        index_to_pack = {}
 
365
        access = _DirectPackAccess(index_to_pack,
 
366
                                   reload_func=self._reload_func)
 
367
        if for_write:
 
368
            # Use new_pack
 
369
            if self.new_pack is None:
 
370
                raise AssertionError('No new pack has been set')
 
371
            index = getattr(self.new_pack, index_name)
 
372
            index_to_pack[index] = self.new_pack.access_tuple()
 
373
            index.set_optimize(for_size=True)
 
374
            access.set_writer(self.new_pack._writer, index,
 
375
                              self.new_pack.access_tuple())
 
376
            add_callback = index.add_nodes
 
377
        else:
 
378
            indices = []
 
379
            for pack in self.packs:
 
380
                sub_index = getattr(pack, index_name)
 
381
                index_to_pack[sub_index] = pack.access_tuple()
 
382
                indices.append(sub_index)
 
383
            index = _mod_index.CombinedGraphIndex(indices)
 
384
            add_callback = None
 
385
        vf = GroupCompressVersionedFiles(
 
386
            _GCGraphIndex(index,
 
387
                          add_callback=add_callback,
 
388
                          parents=parents,
 
389
                          is_locked=self._pack_collection.repo.is_locked),
 
390
            access=access,
 
391
            delta=delta)
 
392
        return vf
 
393
 
 
394
    def _build_vfs(self, index_name, parents, delta):
 
395
        """Build the source and target VersionedFiles."""
 
396
        source_vf = self._build_vf(index_name, parents,
 
397
                                   delta, for_write=False)
 
398
        target_vf = self._build_vf(index_name, parents,
 
399
                                   delta, for_write=True)
 
400
        return source_vf, target_vf
 
401
 
 
402
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
403
                     pb_offset):
 
404
        trace.mutter('repacking %d %s', len(keys), message)
 
405
        self.pb.update('repacking %s' % (message,), pb_offset)
 
406
        child_pb = ui.ui_factory.nested_progress_bar()
 
407
        try:
 
408
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
409
            for _ in target_vf._insert_record_stream(stream,
 
410
                                                     random_id=True,
 
411
                                                     reuse_blocks=False):
 
412
                pass
 
413
        finally:
 
414
            child_pb.finished()
 
415
 
 
416
    def _copy_revision_texts(self):
 
417
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
418
        if not self.revision_keys:
 
419
            # We are doing a full fetch, aka 'pack'
 
420
            self.revision_keys = source_vf.keys()
 
421
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
422
                          'revisions', self._get_progress_stream, 1)
 
423
 
 
424
    def _copy_inventory_texts(self):
 
425
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
426
        # It is not sufficient to just use self.revision_keys, as stacked
 
427
        # repositories can have more inventories than they have revisions.
 
428
        # One alternative would be to do something with
 
429
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
 
430
        # than this.
 
431
        inventory_keys = source_vf.keys()
 
432
        missing_inventories = set(self.revision_keys).difference(inventory_keys)
 
433
        if missing_inventories:
 
434
            # Go back to the original repo, to see if these are really missing
 
435
            # https://bugs.launchpad.net/bzr/+bug/437003
 
436
            # If we are packing a subset of the repo, it is fine to just have
 
437
            # the data in another Pack file, which is not included in this pack
 
438
            # operation.
 
439
            inv_index = self._pack_collection.repo.inventories._index
 
440
            pmap = inv_index.get_parent_map(missing_inventories)
 
441
            really_missing = missing_inventories.difference(pmap)
 
442
            if really_missing:
 
443
                missing_inventories = sorted(really_missing)
 
444
                raise ValueError('We are missing inventories for revisions: %s'
 
445
                    % (missing_inventories,))
 
446
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
447
                          'inventories', self._get_filtered_inv_stream, 2)
 
448
 
 
449
    def _get_chk_vfs_for_copy(self):
 
450
        return self._build_vfs('chk', False, False)
 
451
 
 
452
    def _copy_chk_texts(self):
 
453
        source_vf, target_vf = self._get_chk_vfs_for_copy()
 
454
        # TODO: This is technically spurious... if it is a performance issue,
 
455
        #       remove it
 
456
        total_keys = source_vf.keys()
 
457
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
458
                     ' %d p_id_map roots, %d total keys',
 
459
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
460
                     len(total_keys))
 
461
        self.pb.update('repacking chk', 3)
 
462
        child_pb = ui.ui_factory.nested_progress_bar()
 
463
        try:
 
464
            for stream in self._get_chk_streams(source_vf, total_keys,
 
465
                                                pb=child_pb):
 
466
                for _ in target_vf._insert_record_stream(stream,
 
467
                                                         random_id=True,
 
468
                                                         reuse_blocks=False):
 
469
                    pass
 
470
        finally:
 
471
            child_pb.finished()
 
472
 
 
473
    def _copy_text_texts(self):
 
474
        source_vf, target_vf = self._build_vfs('text', True, True)
 
475
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
476
        #      revision_id) keys.  We don't do it yet because you really need
 
477
        #      to filter out the ones that are present in the parents of the
 
478
        #      rev just before the ones you are copying, otherwise the filter
 
479
        #      is grabbing too many keys...
 
480
        text_keys = source_vf.keys()
 
481
        self._copy_stream(source_vf, target_vf, text_keys,
 
482
                          'texts', self._get_progress_stream, 4)
 
483
 
 
484
    def _copy_signature_texts(self):
 
485
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
486
        signature_keys = source_vf.keys()
 
487
        signature_keys.intersection(self.revision_keys)
 
488
        self._copy_stream(source_vf, target_vf, signature_keys,
 
489
                          'signatures', self._get_progress_stream, 5)
 
490
 
 
491
    def _create_pack_from_packs(self):
 
492
        self.pb.update('repacking', 0, 7)
 
493
        self.new_pack = self.open_pack()
 
494
        # Is this necessary for GC ?
 
495
        self.new_pack.set_write_cache_size(1024*1024)
 
496
        self._copy_revision_texts()
 
497
        self._copy_inventory_texts()
 
498
        self._copy_chk_texts()
 
499
        self._copy_text_texts()
 
500
        self._copy_signature_texts()
 
501
        self.new_pack._check_references()
 
502
        if not self._use_pack(self.new_pack):
 
503
            self.new_pack.abort()
 
504
            return None
 
505
        self.new_pack.finish_content()
 
506
        if len(self.packs) == 1:
 
507
            old_pack = self.packs[0]
 
508
            if old_pack.name == self.new_pack._hash.hexdigest():
 
509
                # The single old pack was already optimally packed.
 
510
                trace.mutter('single pack %s was already optimally packed',
 
511
                    old_pack.name)
 
512
                self.new_pack.abort()
 
513
                return None
 
514
        self.pb.update('finishing repack', 6, 7)
 
515
        self.new_pack.finish()
 
516
        self._pack_collection.allocate(self.new_pack)
 
517
        return self.new_pack
 
518
 
 
519
 
 
520
class GCCHKReconcilePacker(GCCHKPacker):
 
521
    """A packer which regenerates indices etc as it copies.
 
522
 
 
523
    This is used by ``brz reconcile`` to cause parent text pointers to be
 
524
    regenerated.
 
525
    """
 
526
 
 
527
    def __init__(self, *args, **kwargs):
 
528
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
 
529
        self._data_changed = False
 
530
        self._gather_text_refs = True
 
531
 
 
532
    def _copy_inventory_texts(self):
 
533
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
534
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
535
                          'inventories', self._get_filtered_inv_stream, 2)
 
536
        if source_vf.keys() != self.revision_keys:
 
537
            self._data_changed = True
 
538
 
 
539
    def _copy_text_texts(self):
 
540
        """generate what texts we should have and then copy."""
 
541
        source_vf, target_vf = self._build_vfs('text', True, True)
 
542
        trace.mutter('repacking %d texts', len(self._text_refs))
 
543
        self.pb.update("repacking texts", 4)
 
544
        # we have three major tasks here:
 
545
        # 1) generate the ideal index
 
546
        repo = self._pack_collection.repo
 
547
        # We want the one we just wrote, so base it on self.new_pack
 
548
        revision_vf = self._build_vf('revision', True, False, for_write=True)
 
549
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
 
550
        # Strip keys back into revision_ids.
 
551
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
 
552
                         for k, parents in viewitems(ancestor_keys))
 
553
        del ancestor_keys
 
554
        # TODO: _generate_text_key_index should be much cheaper to generate from
 
555
        #       a chk repository, rather than the current implementation
 
556
        ideal_index = repo._generate_text_key_index(None, ancestors)
 
557
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
 
558
        # 2) generate a keys list that contains all the entries that can
 
559
        #    be used as-is, with corrected parents.
 
560
        ok_keys = []
 
561
        new_parent_keys = {} # (key, parent_keys)
 
562
        discarded_keys = []
 
563
        NULL_REVISION = _mod_revision.NULL_REVISION
 
564
        for key in self._text_refs:
 
565
            # 0 - index
 
566
            # 1 - key
 
567
            # 2 - value
 
568
            # 3 - refs
 
569
            try:
 
570
                ideal_parents = tuple(ideal_index[key])
 
571
            except KeyError:
 
572
                discarded_keys.append(key)
 
573
                self._data_changed = True
 
574
            else:
 
575
                if ideal_parents == (NULL_REVISION,):
 
576
                    ideal_parents = ()
 
577
                source_parents = file_id_parent_map[key]
 
578
                if ideal_parents == source_parents:
 
579
                    # no change needed.
 
580
                    ok_keys.append(key)
 
581
                else:
 
582
                    # We need to change the parent graph, but we don't need to
 
583
                    # re-insert the text (since we don't pun the compression
 
584
                    # parent with the parents list)
 
585
                    self._data_changed = True
 
586
                    new_parent_keys[key] = ideal_parents
 
587
        # we're finished with some data.
 
588
        del ideal_index
 
589
        del file_id_parent_map
 
590
        # 3) bulk copy the data, updating records than need it
 
591
        def _update_parents_for_texts():
 
592
            stream = source_vf.get_record_stream(self._text_refs,
 
593
                'groupcompress', False)
 
594
            for record in stream:
 
595
                if record.key in new_parent_keys:
 
596
                    record.parents = new_parent_keys[record.key]
 
597
                yield record
 
598
        target_vf.insert_record_stream(_update_parents_for_texts())
 
599
 
 
600
    def _use_pack(self, new_pack):
 
601
        """Override _use_pack to check for reconcile having changed content."""
 
602
        return new_pack.data_inserted() and self._data_changed
 
603
 
 
604
 
 
605
class GCCHKCanonicalizingPacker(GCCHKPacker):
 
606
    """A packer that ensures inventories have canonical-form CHK maps.
 
607
    
 
608
    Ideally this would be part of reconcile, but it's very slow and rarely
 
609
    needed.  (It repairs repositories affected by
 
610
    https://bugs.launchpad.net/bzr/+bug/522637).
 
611
    """
 
612
 
 
613
    def __init__(self, *args, **kwargs):
 
614
        super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
 
615
        self._data_changed = False
 
616
 
 
617
    def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
 
618
        """Create and exhaust a stream, but don't insert it.
 
619
 
 
620
        This is useful to get the side-effects of generating a stream.
 
621
        """
 
622
        self.pb.update('scanning %s' % (message,), pb_offset)
 
623
        child_pb = ui.ui_factory.nested_progress_bar()
 
624
        try:
 
625
            list(vf_to_stream(source_vf, keys, message, child_pb))
 
626
        finally:
 
627
            child_pb.finished()
 
628
 
 
629
    def _copy_inventory_texts(self):
 
630
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
631
        source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
 
632
        inventory_keys = source_vf.keys()
 
633
        # First, copy the existing CHKs on the assumption that most of them
 
634
        # will be correct.  This will save us from having to reinsert (and
 
635
        # recompress) these records later at the cost of perhaps preserving a
 
636
        # few unused CHKs. 
 
637
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
 
638
        # variables needed by GCCHKPacker._copy_chk_texts.)
 
639
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
 
640
                self._get_filtered_inv_stream, 2)
 
641
        GCCHKPacker._copy_chk_texts(self)
 
642
        # Now copy and fix the inventories, and any regenerated CHKs.
 
643
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
 
644
            return self._get_filtered_canonicalizing_inv_stream(
 
645
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
 
646
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
647
                          'inventories', chk_canonicalizing_inv_stream, 4)
 
648
 
 
649
    def _copy_chk_texts(self):
 
650
        # No-op; in this class this happens during _copy_inventory_texts.
 
651
        pass
 
652
 
 
653
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
 
654
            pb=None, source_chk_vf=None, target_chk_vf=None):
 
655
        """Filter the texts of inventories, regenerating CHKs to make sure they
 
656
        are canonical.
 
657
        """
 
658
        total_keys = len(keys)
 
659
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
 
660
        def _filtered_inv_stream():
 
661
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
662
            search_key_name = None
 
663
            for idx, record in enumerate(stream):
 
664
                # Inventories should always be with revisions; assume success.
 
665
                bytes = record.get_bytes_as('fulltext')
 
666
                chk_inv = inventory.CHKInventory.deserialise(
 
667
                    source_chk_vf, bytes, record.key)
 
668
                if pb is not None:
 
669
                    pb.update('inv', idx, total_keys)
 
670
                chk_inv.id_to_entry._ensure_root()
 
671
                if search_key_name is None:
 
672
                    # Find the name corresponding to the search_key_func
 
673
                    search_key_reg = chk_map.search_key_registry
 
674
                    for search_key_name, func in viewitems(search_key_reg):
 
675
                        if func == chk_inv.id_to_entry._search_key_func:
 
676
                            break
 
677
                canonical_inv = inventory.CHKInventory.from_inventory(
 
678
                    target_chk_vf, chk_inv,
 
679
                    maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
 
680
                    search_key_name=search_key_name)
 
681
                if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
 
682
                    trace.mutter(
 
683
                        'Non-canonical CHK map for id_to_entry of inv: %s '
 
684
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
 
685
                        chk_inv.id_to_entry.key()[0],
 
686
                        canonical_inv.id_to_entry.key()[0]))
 
687
                    self._data_changed = True
 
688
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
689
                p_id_map._ensure_root()
 
690
                canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
 
691
                if p_id_map.key() != canon_p_id_map.key():
 
692
                    trace.mutter(
 
693
                        'Non-canonical CHK map for parent_id_to_basename of '
 
694
                        'inv: %s (root is %s, should be %s)'
 
695
                        % (chk_inv.revision_id, p_id_map.key()[0],
 
696
                           canon_p_id_map.key()[0]))
 
697
                    self._data_changed = True
 
698
                yield versionedfile.ChunkedContentFactory(record.key,
 
699
                        record.parents, record.sha1,
 
700
                        canonical_inv.to_lines())
 
701
            # We have finished processing all of the inventory records, we
 
702
            # don't need these sets anymore
 
703
        return _filtered_inv_stream()
 
704
 
 
705
    def _use_pack(self, new_pack):
 
706
        """Override _use_pack to check for reconcile having changed content."""
 
707
        return new_pack.data_inserted() and self._data_changed
 
708
 
 
709
 
 
710
class GCRepositoryPackCollection(RepositoryPackCollection):
 
711
 
 
712
    pack_factory = GCPack
 
713
    resumed_pack_factory = ResumedGCPack
 
714
    normal_packer_class = GCCHKPacker
 
715
    optimising_packer_class = GCCHKPacker
 
716
 
 
717
    def _check_new_inventories(self):
 
718
        """Detect missing inventories or chk root entries for the new revisions
 
719
        in this write group.
 
720
 
 
721
        :returns: list of strs, summarising any problems found.  If the list is
 
722
            empty no problems were found.
 
723
        """
 
724
        # Ensure that all revisions added in this write group have:
 
725
        #   - corresponding inventories,
 
726
        #   - chk root entries for those inventories,
 
727
        #   - and any present parent inventories have their chk root
 
728
        #     entries too.
 
729
        # And all this should be independent of any fallback repository.
 
730
        problems = []
 
731
        key_deps = self.repo.revisions._index._key_dependencies
 
732
        new_revisions_keys = key_deps.get_new_keys()
 
733
        no_fallback_inv_index = self.repo.inventories._index
 
734
        no_fallback_chk_bytes_index = self.repo.chk_bytes._index
 
735
        no_fallback_texts_index = self.repo.texts._index
 
736
        inv_parent_map = no_fallback_inv_index.get_parent_map(
 
737
            new_revisions_keys)
 
738
        # Are any inventories for corresponding to the new revisions missing?
 
739
        corresponding_invs = set(inv_parent_map)
 
740
        missing_corresponding = set(new_revisions_keys)
 
741
        missing_corresponding.difference_update(corresponding_invs)
 
742
        if missing_corresponding:
 
743
            problems.append("inventories missing for revisions %s" %
 
744
                (sorted(missing_corresponding),))
 
745
            return problems
 
746
        # Are any chk root entries missing for any inventories?  This includes
 
747
        # any present parent inventories, which may be used when calculating
 
748
        # deltas for streaming.
 
749
        all_inv_keys = set(corresponding_invs)
 
750
        for parent_inv_keys in viewvalues(inv_parent_map):
 
751
            all_inv_keys.update(parent_inv_keys)
 
752
        # Filter out ghost parents.
 
753
        all_inv_keys.intersection_update(
 
754
            no_fallback_inv_index.get_parent_map(all_inv_keys))
 
755
        parent_invs_only_keys = all_inv_keys.symmetric_difference(
 
756
            corresponding_invs)
 
757
        all_missing = set()
 
758
        inv_ids = [key[-1] for key in all_inv_keys]
 
759
        parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
 
760
        root_key_info = _build_interesting_key_sets(
 
761
            self.repo, inv_ids, parent_invs_only_ids)
 
762
        expected_chk_roots = root_key_info.all_keys()
 
763
        present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
 
764
            expected_chk_roots)
 
765
        missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
 
766
        if missing_chk_roots:
 
767
            problems.append(
 
768
                "missing referenced chk root keys: %s."
 
769
                "Run 'brz reconcile --canonicalize-chks' on the affected "
 
770
                "repository."
 
771
                % (sorted(missing_chk_roots),))
 
772
            # Don't bother checking any further.
 
773
            return problems
 
774
        # Find all interesting chk_bytes records, and make sure they are
 
775
        # present, as well as the text keys they reference.
 
776
        chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
 
777
        chk_bytes_no_fallbacks._search_key_func = \
 
778
            self.repo.chk_bytes._search_key_func
 
779
        chk_diff = chk_map.iter_interesting_nodes(
 
780
            chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
 
781
            root_key_info.uninteresting_root_keys)
 
782
        text_keys = set()
 
783
        try:
 
784
            for record in _filter_text_keys(chk_diff, text_keys,
 
785
                                            chk_map._bytes_to_text_key):
 
786
                pass
 
787
        except errors.NoSuchRevision as e:
 
788
            # XXX: It would be nice if we could give a more precise error here.
 
789
            problems.append("missing chk node(s) for id_to_entry maps")
 
790
        chk_diff = chk_map.iter_interesting_nodes(
 
791
            chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
 
792
            root_key_info.uninteresting_pid_root_keys)
 
793
        try:
 
794
            for interesting_rec, interesting_map in chk_diff:
 
795
                pass
 
796
        except errors.NoSuchRevision as e:
 
797
            problems.append(
 
798
                "missing chk node(s) for parent_id_basename_to_file_id maps")
 
799
        present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
 
800
        missing_text_keys = text_keys.difference(present_text_keys)
 
801
        if missing_text_keys:
 
802
            problems.append("missing text keys: %r"
 
803
                % (sorted(missing_text_keys),))
 
804
        return problems
 
805
 
 
806
 
 
807
class CHKInventoryRepository(PackRepository):
 
808
    """subclass of PackRepository that uses CHK based inventories."""
 
809
 
 
810
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
811
        _serializer):
 
812
        """Overridden to change pack collection class."""
 
813
        super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
 
814
            control_files, _commit_builder_class, _serializer)
 
815
        index_transport = self._transport.clone('indices')
 
816
        self._pack_collection = GCRepositoryPackCollection(self,
 
817
            self._transport, index_transport,
 
818
            self._transport.clone('upload'),
 
819
            self._transport.clone('packs'),
 
820
            _format.index_builder_class,
 
821
            _format.index_class,
 
822
            use_chk_index=self._format.supports_chks,
 
823
            )
 
824
        self.inventories = GroupCompressVersionedFiles(
 
825
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
826
                add_callback=self._pack_collection.inventory_index.add_callback,
 
827
                parents=True, is_locked=self.is_locked,
 
828
                inconsistency_fatal=False),
 
829
            access=self._pack_collection.inventory_index.data_access)
 
830
        self.revisions = GroupCompressVersionedFiles(
 
831
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
832
                add_callback=self._pack_collection.revision_index.add_callback,
 
833
                parents=True, is_locked=self.is_locked,
 
834
                track_external_parent_refs=True, track_new_keys=True),
 
835
            access=self._pack_collection.revision_index.data_access,
 
836
            delta=False)
 
837
        self.signatures = GroupCompressVersionedFiles(
 
838
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
839
                add_callback=self._pack_collection.signature_index.add_callback,
 
840
                parents=False, is_locked=self.is_locked,
 
841
                inconsistency_fatal=False),
 
842
            access=self._pack_collection.signature_index.data_access,
 
843
            delta=False)
 
844
        self.texts = GroupCompressVersionedFiles(
 
845
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
846
                add_callback=self._pack_collection.text_index.add_callback,
 
847
                parents=True, is_locked=self.is_locked,
 
848
                inconsistency_fatal=False),
 
849
            access=self._pack_collection.text_index.data_access)
 
850
        # No parents, individual CHK pages don't have specific ancestry
 
851
        self.chk_bytes = GroupCompressVersionedFiles(
 
852
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
853
                add_callback=self._pack_collection.chk_index.add_callback,
 
854
                parents=False, is_locked=self.is_locked,
 
855
                inconsistency_fatal=False),
 
856
            access=self._pack_collection.chk_index.data_access)
 
857
        search_key_name = self._format._serializer.search_key_name
 
858
        search_key_func = chk_map.search_key_registry.get(search_key_name)
 
859
        self.chk_bytes._search_key_func = search_key_func
 
860
        # True when the repository object is 'write locked' (as opposed to the
 
861
        # physical lock only taken out around changes to the pack-names list.)
 
862
        # Another way to represent this would be a decorator around the control
 
863
        # files object that presents logical locks as physical ones - if this
 
864
        # gets ugly consider that alternative design. RBC 20071011
 
865
        self._write_lock_count = 0
 
866
        self._transaction = None
 
867
        # for tests
 
868
        self._reconcile_does_inventory_gc = True
 
869
        self._reconcile_fixes_text_parents = True
 
870
        self._reconcile_backsup_inventory = False
 
871
 
 
872
    def _add_inventory_checked(self, revision_id, inv, parents):
 
873
        """Add inv to the repository after checking the inputs.
 
874
 
 
875
        This function can be overridden to allow different inventory styles.
 
876
 
 
877
        :seealso: add_inventory, for the contract.
 
878
        """
 
879
        # make inventory
 
880
        serializer = self._format._serializer
 
881
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
 
882
            maximum_size=serializer.maximum_size,
 
883
            search_key_name=serializer.search_key_name)
 
884
        inv_lines = result.to_lines()
 
885
        return self._inventory_add_lines(revision_id, parents,
 
886
            inv_lines, check_content=False)
 
887
 
 
888
    def _create_inv_from_null(self, delta, revision_id):
 
889
        """This will mutate new_inv directly.
 
890
 
 
891
        This is a simplified form of create_by_apply_delta which knows that all
 
892
        the old values must be None, so everything is a create.
 
893
        """
 
894
        serializer = self._format._serializer
 
895
        new_inv = inventory.CHKInventory(serializer.search_key_name)
 
896
        new_inv.revision_id = revision_id
 
897
        entry_to_bytes = new_inv._entry_to_bytes
 
898
        id_to_entry_dict = {}
 
899
        parent_id_basename_dict = {}
 
900
        for old_path, new_path, file_id, entry in delta:
 
901
            if old_path is not None:
 
902
                raise ValueError('Invalid delta, somebody tried to delete %r'
 
903
                                 ' from the NULL_REVISION'
 
904
                                 % ((old_path, file_id),))
 
905
            if new_path is None:
 
906
                raise ValueError('Invalid delta, delta from NULL_REVISION has'
 
907
                                 ' no new_path %r' % (file_id,))
 
908
            if new_path == '':
 
909
                new_inv.root_id = file_id
 
910
                parent_id_basename_key = StaticTuple('', '').intern()
 
911
            else:
 
912
                utf8_entry_name = entry.name.encode('utf-8')
 
913
                parent_id_basename_key = StaticTuple(entry.parent_id,
 
914
                                                     utf8_entry_name).intern()
 
915
            new_value = entry_to_bytes(entry)
 
916
            # Populate Caches?
 
917
            # new_inv._path_to_fileid_cache[new_path] = file_id
 
918
            key = StaticTuple(file_id).intern()
 
919
            id_to_entry_dict[key] = new_value
 
920
            parent_id_basename_dict[parent_id_basename_key] = file_id
 
921
 
 
922
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
 
923
            parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
924
        return new_inv
 
925
 
 
926
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
 
927
                               parents, basis_inv=None, propagate_caches=False):
 
928
        """Add a new inventory expressed as a delta against another revision.
 
929
 
 
930
        :param basis_revision_id: The inventory id the delta was created
 
931
            against.
 
932
        :param delta: The inventory delta (see Inventory.apply_delta for
 
933
            details).
 
934
        :param new_revision_id: The revision id that the inventory is being
 
935
            added for.
 
936
        :param parents: The revision ids of the parents that revision_id is
 
937
            known to have and are in the repository already. These are supplied
 
938
            for repositories that depend on the inventory graph for revision
 
939
            graph access, as well as for those that pun ancestry with delta
 
940
            compression.
 
941
        :param basis_inv: The basis inventory if it is already known,
 
942
            otherwise None.
 
943
        :param propagate_caches: If True, the caches for this inventory are
 
944
          copied to and updated for the result if possible.
 
945
 
 
946
        :returns: (validator, new_inv)
 
947
            The validator(which is a sha1 digest, though what is sha'd is
 
948
            repository format specific) of the serialized inventory, and the
 
949
            resulting inventory.
 
950
        """
 
951
        if not self.is_in_write_group():
 
952
            raise AssertionError("%r not in write group" % (self,))
 
953
        _mod_revision.check_not_reserved_id(new_revision_id)
 
954
        basis_tree = None
 
955
        if basis_inv is None:
 
956
            if basis_revision_id == _mod_revision.NULL_REVISION:
 
957
                new_inv = self._create_inv_from_null(delta, new_revision_id)
 
958
                if new_inv.root_id is None:
 
959
                    raise errors.RootMissing()
 
960
                inv_lines = new_inv.to_lines()
 
961
                return self._inventory_add_lines(new_revision_id, parents,
 
962
                    inv_lines, check_content=False), new_inv
 
963
            else:
 
964
                basis_tree = self.revision_tree(basis_revision_id)
 
965
                basis_tree.lock_read()
 
966
                basis_inv = basis_tree.root_inventory
 
967
        try:
 
968
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
 
969
                propagate_caches=propagate_caches)
 
970
            inv_lines = result.to_lines()
 
971
            return self._inventory_add_lines(new_revision_id, parents,
 
972
                inv_lines, check_content=False), result
 
973
        finally:
 
974
            if basis_tree is not None:
 
975
                basis_tree.unlock()
 
976
 
 
977
    def _deserialise_inventory(self, revision_id, bytes):
 
978
        return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
 
979
            (revision_id,))
 
980
 
 
981
    def _iter_inventories(self, revision_ids, ordering):
 
982
        """Iterate over many inventory objects."""
 
983
        if ordering is None:
 
984
            ordering = 'unordered'
 
985
        keys = [(revision_id,) for revision_id in revision_ids]
 
986
        stream = self.inventories.get_record_stream(keys, ordering, True)
 
987
        texts = {}
 
988
        for record in stream:
 
989
            if record.storage_kind != 'absent':
 
990
                texts[record.key] = record.get_bytes_as('fulltext')
 
991
            else:
 
992
                texts[record.key] = None
 
993
        for key in keys:
 
994
            bytes = texts[key]
 
995
            if bytes is None:
 
996
                yield (None, key[-1])
 
997
            else:
 
998
                yield (inventory.CHKInventory.deserialise(
 
999
                    self.chk_bytes, bytes, key), key[-1])
 
1000
 
 
1001
    def _get_inventory_xml(self, revision_id):
 
1002
        """Get serialized inventory as a string."""
 
1003
        # Without a native 'xml' inventory, this method doesn't make sense.
 
1004
        # However older working trees, and older bundles want it - so we supply
 
1005
        # it allowing _get_inventory_xml to work. Bundles currently use the
 
1006
        # serializer directly; this also isn't ideal, but there isn't an xml
 
1007
        # iteration interface offered at all for repositories.
 
1008
        return self._serializer.write_inventory_to_string(
 
1009
            self.get_inventory(revision_id))
 
1010
 
 
1011
    def _find_present_inventory_keys(self, revision_keys):
 
1012
        parent_map = self.inventories.get_parent_map(revision_keys)
 
1013
        present_inventory_keys = set(k for k in parent_map)
 
1014
        return present_inventory_keys
 
1015
 
 
1016
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
 
1017
        """Find the file ids and versions affected by revisions.
 
1018
 
 
1019
        :param revisions: an iterable containing revision ids.
 
1020
        :param _inv_weave: The inventory weave from this repository or None.
 
1021
            If None, the inventory weave will be opened automatically.
 
1022
        :return: a dictionary mapping altered file-ids to an iterable of
 
1023
            revision_ids. Each altered file-ids has the exact revision_ids that
 
1024
            altered it listed explicitly.
 
1025
        """
 
1026
        rich_root = self.supports_rich_root()
 
1027
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
 
1028
        file_id_revisions = {}
 
1029
        pb = ui.ui_factory.nested_progress_bar()
 
1030
        try:
 
1031
            revision_keys = [(r,) for r in revision_ids]
 
1032
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
 
1033
            # TODO: instead of using _find_present_inventory_keys, change the
 
1034
            #       code paths to allow missing inventories to be tolerated.
 
1035
            #       However, we only want to tolerate missing parent
 
1036
            #       inventories, not missing inventories for revision_ids
 
1037
            present_parent_inv_keys = self._find_present_inventory_keys(
 
1038
                                        parent_keys)
 
1039
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
 
1040
            inventories_to_read = set(revision_ids)
 
1041
            inventories_to_read.update(present_parent_inv_ids)
 
1042
            root_key_info = _build_interesting_key_sets(
 
1043
                self, inventories_to_read, present_parent_inv_ids)
 
1044
            interesting_root_keys = root_key_info.interesting_root_keys
 
1045
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
 
1046
            chk_bytes = self.chk_bytes
 
1047
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1048
                        interesting_root_keys, uninteresting_root_keys,
 
1049
                        pb=pb):
 
1050
                for name, bytes in items:
 
1051
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
 
1052
                    # TODO: consider interning file_id, revision_id here, or
 
1053
                    #       pushing that intern() into bytes_to_info()
 
1054
                    # TODO: rich_root should always be True here, for all
 
1055
                    #       repositories that support chk_bytes
 
1056
                    if not rich_root and name_utf8 == '':
 
1057
                        continue
 
1058
                    try:
 
1059
                        file_id_revisions[file_id].add(revision_id)
 
1060
                    except KeyError:
 
1061
                        file_id_revisions[file_id] = {revision_id}
 
1062
        finally:
 
1063
            pb.finished()
 
1064
        return file_id_revisions
 
1065
 
 
1066
    def find_text_key_references(self):
 
1067
        """Find the text key references within the repository.
 
1068
 
 
1069
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
 
1070
            to whether they were referred to by the inventory of the
 
1071
            revision_id that they contain. The inventory texts from all present
 
1072
            revision ids are assessed to generate this report.
 
1073
        """
 
1074
        # XXX: Slow version but correct: rewrite as a series of delta
 
1075
        # examinations/direct tree traversal. Note that that will require care
 
1076
        # as a common node is reachable both from the inventory that added it,
 
1077
        # and others afterwards.
 
1078
        revision_keys = self.revisions.keys()
 
1079
        result = {}
 
1080
        rich_roots = self.supports_rich_root()
 
1081
        pb = ui.ui_factory.nested_progress_bar()
 
1082
        try:
 
1083
            all_revs = self.all_revision_ids()
 
1084
            total = len(all_revs)
 
1085
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
 
1086
                pb.update("Finding text references", pos, total)
 
1087
                for _, entry in inv.iter_entries():
 
1088
                    if not rich_roots and entry.file_id == inv.root_id:
 
1089
                        continue
 
1090
                    key = (entry.file_id, entry.revision)
 
1091
                    result.setdefault(key, False)
 
1092
                    if entry.revision == inv.revision_id:
 
1093
                        result[key] = True
 
1094
            return result
 
1095
        finally:
 
1096
            pb.finished()
 
1097
 
 
1098
    @needs_write_lock
 
1099
    def reconcile_canonicalize_chks(self):
 
1100
        """Reconcile this repository to make sure all CHKs are in canonical
 
1101
        form.
 
1102
        """
 
1103
        from breezy.reconcile import PackReconciler
 
1104
        reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
 
1105
        reconciler.reconcile()
 
1106
        return reconciler
 
1107
 
 
1108
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
1109
        packer = GCCHKReconcilePacker(collection, packs, extension)
 
1110
        return packer.pack(pb)
 
1111
 
 
1112
    def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
 
1113
        packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
 
1114
        return packer.pack(pb)
 
1115
 
 
1116
    def _get_source(self, to_format):
 
1117
        """Return a source for streaming from this repository."""
 
1118
        if self._format._serializer == to_format._serializer:
 
1119
            # We must be exactly the same format, otherwise stuff like the chk
 
1120
            # page layout might be different.
 
1121
            # Actually, this test is just slightly looser than exact so that
 
1122
            # CHK2 <-> 2a transfers will work.
 
1123
            return GroupCHKStreamSource(self, to_format)
 
1124
        return super(CHKInventoryRepository, self)._get_source(to_format)
 
1125
 
 
1126
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
 
1127
        """Find revisions with different parent lists in the revision object
 
1128
        and in the index graph.
 
1129
 
 
1130
        :param revisions_iterator: None, or an iterator of (revid,
 
1131
            Revision-or-None). This iterator controls the revisions checked.
 
1132
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
 
1133
            parents-in-revision).
 
1134
        """
 
1135
        if not self.is_locked():
 
1136
            raise AssertionError()
 
1137
        vf = self.revisions
 
1138
        if revisions_iterator is None:
 
1139
            revisions_iterator = self._iter_revisions(None)
 
1140
        for revid, revision in revisions_iterator:
 
1141
            if revision is None:
 
1142
                pass
 
1143
            parent_map = vf.get_parent_map([(revid,)])
 
1144
            parents_according_to_index = tuple(parent[-1] for parent in
 
1145
                parent_map[(revid,)])
 
1146
            parents_according_to_revision = tuple(revision.parent_ids)
 
1147
            if parents_according_to_index != parents_according_to_revision:
 
1148
                yield (revid, parents_according_to_index,
 
1149
                    parents_according_to_revision)
 
1150
 
 
1151
    def _check_for_inconsistent_revision_parents(self):
 
1152
        inconsistencies = list(self._find_inconsistent_revision_parents())
 
1153
        if inconsistencies:
 
1154
            raise errors.BzrCheckError(
 
1155
                "Revision index has inconsistent parents.")
 
1156
 
 
1157
 
 
1158
class GroupCHKStreamSource(StreamSource):
 
1159
    """Used when both the source and target repo are GroupCHK repos."""
 
1160
 
 
1161
    def __init__(self, from_repository, to_format):
 
1162
        """Create a StreamSource streaming from from_repository."""
 
1163
        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
 
1164
        self._revision_keys = None
 
1165
        self._text_keys = None
 
1166
        self._text_fetch_order = 'groupcompress'
 
1167
        self._chk_id_roots = None
 
1168
        self._chk_p_id_roots = None
 
1169
 
 
1170
    def _get_inventory_stream(self, inventory_keys, allow_absent=False):
 
1171
        """Get a stream of inventory texts.
 
1172
 
 
1173
        When this function returns, self._chk_id_roots and self._chk_p_id_roots
 
1174
        should be populated.
 
1175
        """
 
1176
        self._chk_id_roots = []
 
1177
        self._chk_p_id_roots = []
 
1178
        def _filtered_inv_stream():
 
1179
            id_roots_set = set()
 
1180
            p_id_roots_set = set()
 
1181
            source_vf = self.from_repository.inventories
 
1182
            stream = source_vf.get_record_stream(inventory_keys,
 
1183
                                                 'groupcompress', True)
 
1184
            for record in stream:
 
1185
                if record.storage_kind == 'absent':
 
1186
                    if allow_absent:
 
1187
                        continue
 
1188
                    else:
 
1189
                        raise errors.NoSuchRevision(self, record.key)
 
1190
                bytes = record.get_bytes_as('fulltext')
 
1191
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
1192
                                                             record.key)
 
1193
                key = chk_inv.id_to_entry.key()
 
1194
                if key not in id_roots_set:
 
1195
                    self._chk_id_roots.append(key)
 
1196
                    id_roots_set.add(key)
 
1197
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
1198
                if p_id_map is None:
 
1199
                    raise AssertionError('Parent id -> file_id map not set')
 
1200
                key = p_id_map.key()
 
1201
                if key not in p_id_roots_set:
 
1202
                    p_id_roots_set.add(key)
 
1203
                    self._chk_p_id_roots.append(key)
 
1204
                yield record
 
1205
            # We have finished processing all of the inventory records, we
 
1206
            # don't need these sets anymore
 
1207
            id_roots_set.clear()
 
1208
            p_id_roots_set.clear()
 
1209
        return ('inventories', _filtered_inv_stream())
 
1210
 
 
1211
    def _get_filtered_chk_streams(self, excluded_revision_keys):
 
1212
        self._text_keys = set()
 
1213
        excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
 
1214
        if not excluded_revision_keys:
 
1215
            uninteresting_root_keys = set()
 
1216
            uninteresting_pid_root_keys = set()
 
1217
        else:
 
1218
            # filter out any excluded revisions whose inventories are not
 
1219
            # actually present
 
1220
            # TODO: Update Repository.iter_inventories() to add
 
1221
            #       ignore_missing=True
 
1222
            present_keys = self.from_repository._find_present_inventory_keys(
 
1223
                            excluded_revision_keys)
 
1224
            present_ids = [k[-1] for k in present_keys]
 
1225
            uninteresting_root_keys = set()
 
1226
            uninteresting_pid_root_keys = set()
 
1227
            for inv in self.from_repository.iter_inventories(present_ids):
 
1228
                uninteresting_root_keys.add(inv.id_to_entry.key())
 
1229
                uninteresting_pid_root_keys.add(
 
1230
                    inv.parent_id_basename_to_file_id.key())
 
1231
        chk_bytes = self.from_repository.chk_bytes
 
1232
        def _filter_id_to_entry():
 
1233
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
 
1234
                        self._chk_id_roots, uninteresting_root_keys)
 
1235
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
 
1236
                    chk_map._bytes_to_text_key):
 
1237
                if record is not None:
 
1238
                    yield record
 
1239
            # Consumed
 
1240
            self._chk_id_roots = None
 
1241
        yield 'chk_bytes', _filter_id_to_entry()
 
1242
        def _get_parent_id_basename_to_file_id_pages():
 
1243
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1244
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1245
                if record is not None:
 
1246
                    yield record
 
1247
            # Consumed
 
1248
            self._chk_p_id_roots = None
 
1249
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
 
1250
 
 
1251
    def _get_text_stream(self):
 
1252
        # Note: We know we don't have to handle adding root keys, because both
 
1253
        # the source and target are the identical network name.
 
1254
        text_stream = self.from_repository.texts.get_record_stream(
 
1255
                        self._text_keys, self._text_fetch_order, False)
 
1256
        return ('texts', text_stream)
 
1257
 
 
1258
    def get_stream(self, search):
 
1259
        def wrap_and_count(pb, rc, stream):
 
1260
            """Yield records from stream while showing progress."""
 
1261
            count = 0
 
1262
            for record in stream:
 
1263
                if count == rc.STEP:
 
1264
                    rc.increment(count)
 
1265
                    pb.update('Estimate', rc.current, rc.max)
 
1266
                    count = 0
 
1267
                count += 1
 
1268
                yield record
 
1269
 
 
1270
        revision_ids = search.get_keys()
 
1271
        pb = ui.ui_factory.nested_progress_bar()
 
1272
        rc = self._record_counter
 
1273
        self._record_counter.setup(len(revision_ids))
 
1274
        for stream_info in self._fetch_revision_texts(revision_ids):
 
1275
            yield (stream_info[0],
 
1276
                wrap_and_count(pb, rc, stream_info[1]))
 
1277
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1278
        # TODO: The keys to exclude might be part of the search recipe
 
1279
        # For now, exclude all parents that are at the edge of ancestry, for
 
1280
        # which we have inventories
 
1281
        from_repo = self.from_repository
 
1282
        parent_keys = from_repo._find_parent_keys_of_revisions(
 
1283
                        self._revision_keys)
 
1284
        self.from_repository.revisions.clear_cache()
 
1285
        self.from_repository.signatures.clear_cache()
 
1286
        # Clear the repo's get_parent_map cache too.
 
1287
        self.from_repository._unstacked_provider.disable_cache()
 
1288
        self.from_repository._unstacked_provider.enable_cache()
 
1289
        s = self._get_inventory_stream(self._revision_keys)
 
1290
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1291
        self.from_repository.inventories.clear_cache()
 
1292
        for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1293
            yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1294
        self.from_repository.chk_bytes.clear_cache()
 
1295
        s = self._get_text_stream()
 
1296
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1297
        self.from_repository.texts.clear_cache()
 
1298
        pb.update('Done', rc.max, rc.max)
 
1299
        pb.finished()
 
1300
 
 
1301
    def get_stream_for_missing_keys(self, missing_keys):
 
1302
        # missing keys can only occur when we are byte copying and not
 
1303
        # translating (because translation means we don't send
 
1304
        # unreconstructable deltas ever).
 
1305
        missing_inventory_keys = set()
 
1306
        for key in missing_keys:
 
1307
            if key[0] != 'inventories':
 
1308
                raise AssertionError('The only missing keys we should'
 
1309
                    ' be filling in are inventory keys, not %s'
 
1310
                    % (key[0],))
 
1311
            missing_inventory_keys.add(key[1:])
 
1312
        if self._chk_id_roots or self._chk_p_id_roots:
 
1313
            raise AssertionError('Cannot call get_stream_for_missing_keys'
 
1314
                ' until all of get_stream() has been consumed.')
 
1315
        # Yield the inventory stream, so we can find the chk stream
 
1316
        # Some of the missing_keys will be missing because they are ghosts.
 
1317
        # As such, we can ignore them. The Sink is required to verify there are
 
1318
        # no unavailable texts when the ghost inventories are not filled in.
 
1319
        yield self._get_inventory_stream(missing_inventory_keys,
 
1320
                                         allow_absent=True)
 
1321
        # We use the empty set for excluded_revision_keys, to make it clear
 
1322
        # that we want to transmit all referenced chk pages.
 
1323
        for stream_info in self._get_filtered_chk_streams(set()):
 
1324
            yield stream_info
 
1325
 
 
1326
 
 
1327
class _InterestingKeyInfo(object):
 
1328
    def __init__(self):
 
1329
        self.interesting_root_keys = set()
 
1330
        self.interesting_pid_root_keys = set()
 
1331
        self.uninteresting_root_keys = set()
 
1332
        self.uninteresting_pid_root_keys = set()
 
1333
 
 
1334
    def all_interesting(self):
 
1335
        return self.interesting_root_keys.union(self.interesting_pid_root_keys)
 
1336
 
 
1337
    def all_uninteresting(self):
 
1338
        return self.uninteresting_root_keys.union(
 
1339
            self.uninteresting_pid_root_keys)
 
1340
 
 
1341
    def all_keys(self):
 
1342
        return self.all_interesting().union(self.all_uninteresting())
 
1343
 
 
1344
 
 
1345
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
 
1346
    result = _InterestingKeyInfo()
 
1347
    for inv in repo.iter_inventories(inventory_ids, 'unordered'):
 
1348
        root_key = inv.id_to_entry.key()
 
1349
        pid_root_key = inv.parent_id_basename_to_file_id.key()
 
1350
        if inv.revision_id in parent_only_inv_ids:
 
1351
            result.uninteresting_root_keys.add(root_key)
 
1352
            result.uninteresting_pid_root_keys.add(pid_root_key)
 
1353
        else:
 
1354
            result.interesting_root_keys.add(root_key)
 
1355
            result.interesting_pid_root_keys.add(pid_root_key)
 
1356
    return result
 
1357
 
 
1358
 
 
1359
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
 
1360
    """Iterate the result of iter_interesting_nodes, yielding the records
 
1361
    and adding to text_keys.
 
1362
    """
 
1363
    text_keys_update = text_keys.update
 
1364
    for record, items in interesting_nodes_iterable:
 
1365
        text_keys_update([bytes_to_text_key(b) for n,b in items])
 
1366
        yield record
 
1367
 
 
1368
 
 
1369
class RepositoryFormat2a(RepositoryFormatPack):
 
1370
    """A CHK repository that uses the bencode revision serializer."""
 
1371
 
 
1372
    repository_class = CHKInventoryRepository
 
1373
    supports_external_lookups = True
 
1374
    supports_chks = True
 
1375
    _commit_builder_class = PackRootCommitBuilder
 
1376
    rich_root_data = True
 
1377
    _serializer = chk_serializer.chk_bencode_serializer
 
1378
    _commit_inv_deltas = True
 
1379
    # What index classes to use
 
1380
    index_builder_class = BTreeBuilder
 
1381
    index_class = BTreeGraphIndex
 
1382
    # Note: We cannot unpack a delta that references a text we haven't
 
1383
    # seen yet. There are 2 options, work in fulltexts, or require
 
1384
    # topological sorting. Using fulltexts is more optimal for local
 
1385
    # operations, because the source can be smart about extracting
 
1386
    # multiple in-a-row (and sharing strings). Topological is better
 
1387
    # for remote, because we access less data.
 
1388
    _fetch_order = 'unordered'
 
1389
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
 
1390
    fast_deltas = True
 
1391
    pack_compresses = True
 
1392
 
 
1393
    def _get_matching_bzrdir(self):
 
1394
        return controldir.format_registry.make_bzrdir('2a')
 
1395
 
 
1396
    def _ignore_setting_bzrdir(self, format):
 
1397
        pass
 
1398
 
 
1399
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1400
 
 
1401
    @classmethod
 
1402
    def get_format_string(cls):
 
1403
        return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
 
1404
 
 
1405
    def get_format_description(self):
 
1406
        """See RepositoryFormat.get_format_description()."""
 
1407
        return ("Repository format 2a - rich roots, group compression"
 
1408
            " and chk inventories")
 
1409
 
 
1410
 
 
1411
class RepositoryFormat2aSubtree(RepositoryFormat2a):
 
1412
    """A 2a repository format that supports nested trees.
 
1413
 
 
1414
    """
 
1415
 
 
1416
    def _get_matching_bzrdir(self):
 
1417
        return controldir.format_registry.make_bzrdir('development-subtree')
 
1418
 
 
1419
    def _ignore_setting_bzrdir(self, format):
 
1420
        pass
 
1421
 
 
1422
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1423
 
 
1424
    @classmethod
 
1425
    def get_format_string(cls):
 
1426
        return ('Bazaar development format 8\n')
 
1427
 
 
1428
    def get_format_description(self):
 
1429
        """See RepositoryFormat.get_format_description()."""
 
1430
        return ("Development repository format 8 - nested trees, "
 
1431
                "group compression and chk inventories")
 
1432
 
 
1433
    experimental = True
 
1434
    supports_tree_reference = True