/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Repository formats using CHK inventories and groupcompress compression."""
 
18
 
 
19
import time
 
20
 
 
21
from .. import (
 
22
    controldir,
 
23
    debug,
 
24
    errors,
 
25
    osutils,
 
26
    revision as _mod_revision,
 
27
    trace,
 
28
    ui,
 
29
    )
 
30
from ..bzr import (
 
31
    chk_map,
 
32
    chk_serializer,
 
33
    index as _mod_index,
 
34
    inventory,
 
35
    pack,
 
36
    versionedfile,
 
37
    )
 
38
from ..bzr.btree_index import (
 
39
    BTreeGraphIndex,
 
40
    BTreeBuilder,
 
41
    )
 
42
from ..bzr.groupcompress import (
 
43
    _GCGraphIndex,
 
44
    GroupCompressVersionedFiles,
 
45
    )
 
46
from .pack_repo import (
 
47
    _DirectPackAccess,
 
48
    Pack,
 
49
    NewPack,
 
50
    PackRepository,
 
51
    PackCommitBuilder,
 
52
    RepositoryPackCollection,
 
53
    RepositoryFormatPack,
 
54
    ResumedPack,
 
55
    Packer,
 
56
    )
 
57
from ..bzr.vf_repository import (
 
58
    StreamSource,
 
59
    )
 
60
from ..static_tuple import StaticTuple
 
61
 
 
62
 
 
63
class GCPack(NewPack):
 
64
 
 
65
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
 
66
        """Create a NewPack instance.
 
67
 
 
68
        :param pack_collection: A PackCollection into which this is being
 
69
            inserted.
 
70
        :param upload_suffix: An optional suffix to be given to any temporary
 
71
            files created during the pack creation. e.g '.autopack'
 
72
        :param file_mode: An optional file mode to create the new files with.
 
73
        """
 
74
        # replaced from NewPack to:
 
75
        # - change inventory reference list length to 1
 
76
        # - change texts reference lists to 1
 
77
        # TODO: patch this to be parameterised
 
78
 
 
79
        # The relative locations of the packs are constrained, but all are
 
80
        # passed in because the caller has them, so as to avoid object churn.
 
81
        index_builder_class = pack_collection._index_builder_class
 
82
        # from brisbane-core
 
83
        if pack_collection.chk_index is not None:
 
84
            chk_index = index_builder_class(reference_lists=0)
 
85
        else:
 
86
            chk_index = None
 
87
        Pack.__init__(self,
 
88
                      # Revisions: parents list, no text compression.
 
89
                      index_builder_class(reference_lists=1),
 
90
                      # Inventory: We want to map compression only, but currently the
 
91
                      # knit code hasn't been updated enough to understand that, so we
 
92
                      # have a regular 2-list index giving parents and compression
 
93
                      # source.
 
94
                      index_builder_class(reference_lists=1),
 
95
                      # Texts: per file graph, for all fileids - so one reference list
 
96
                      # and two elements in the key tuple.
 
97
                      index_builder_class(reference_lists=1, key_elements=2),
 
98
                      # Signatures: Just blobs to store, no compression, no parents
 
99
                      # listing.
 
100
                      index_builder_class(reference_lists=0),
 
101
                      # CHK based storage - just blobs, no compression or parents.
 
102
                      chk_index=chk_index
 
103
                      )
 
104
        self._pack_collection = pack_collection
 
105
        # When we make readonly indices, we need this.
 
106
        self.index_class = pack_collection._index_class
 
107
        # where should the new pack be opened
 
108
        self.upload_transport = pack_collection._upload_transport
 
109
        # where are indices written out to
 
110
        self.index_transport = pack_collection._index_transport
 
111
        # where is the pack renamed to when it is finished?
 
112
        self.pack_transport = pack_collection._pack_transport
 
113
        # What file mode to upload the pack and indices with.
 
114
        self._file_mode = file_mode
 
115
        # tracks the content written to the .pack file.
 
116
        self._hash = osutils.md5()
 
117
        # a four-tuple with the length in bytes of the indices, once the pack
 
118
        # is finalised. (rev, inv, text, sigs)
 
119
        self.index_sizes = None
 
120
        # How much data to cache when writing packs. Note that this is not
 
121
        # synchronised with reads, because it's not in the transport layer, so
 
122
        # is not safe unless the client knows it won't be reading from the pack
 
123
        # under creation.
 
124
        self._cache_limit = 0
 
125
        # the temporary pack file name.
 
126
        self.random_name = osutils.rand_chars(20) + upload_suffix
 
127
        # when was this pack started ?
 
128
        self.start_time = time.time()
 
129
        # open an output stream for the data added to the pack.
 
130
        self.write_stream = self.upload_transport.open_write_stream(
 
131
            self.random_name, mode=self._file_mode)
 
132
        if 'pack' in debug.debug_flags:
 
133
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
 
134
                         time.ctime(), self.upload_transport.base, self.random_name,
 
135
                         time.time() - self.start_time)
 
136
        # A list of byte sequences to be written to the new pack, and the
 
137
        # aggregate size of them.  Stored as a list rather than separate
 
138
        # variables so that the _write_data closure below can update them.
 
139
        self._buffer = [[], 0]
 
140
        # create a callable for adding data
 
141
        #
 
142
        # robertc says- this is a closure rather than a method on the object
 
143
        # so that the variables are locals, and faster than accessing object
 
144
        # members.
 
145
 
 
146
        def _write_data(data, flush=False, _buffer=self._buffer,
 
147
                        _write=self.write_stream.write, _update=self._hash.update):
 
148
            _buffer[0].append(data)
 
149
            _buffer[1] += len(data)
 
150
            # buffer cap
 
151
            if _buffer[1] > self._cache_limit or flush:
 
152
                data = b''.join(_buffer[0])
 
153
                _write(data)
 
154
                _update(data)
 
155
                _buffer[:] = [[], 0]
 
156
        # expose this on self, for the occasion when clients want to add data.
 
157
        self._write_data = _write_data
 
158
        # a pack writer object to serialise pack records.
 
159
        self._writer = pack.ContainerWriter(self._write_data)
 
160
        self._writer.begin()
 
161
        # what state is the pack in? (open, finished, aborted)
 
162
        self._state = 'open'
 
163
        # no name until we finish writing the content
 
164
        self.name = None
 
165
 
 
166
    def _check_references(self):
 
167
        """Make sure our external references are present.
 
168
 
 
169
        Packs are allowed to have deltas whose base is not in the pack, but it
 
170
        must be present somewhere in this collection.  It is not allowed to
 
171
        have deltas based on a fallback repository.
 
172
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
173
        """
 
174
        # Groupcompress packs don't have any external references, arguably CHK
 
175
        # pages have external references, but we cannot 'cheaply' determine
 
176
        # them without actually walking all of the chk pages.
 
177
 
 
178
 
 
179
class ResumedGCPack(ResumedPack):
 
180
 
 
181
    def _check_references(self):
 
182
        """Make sure our external compression parents are present."""
 
183
        # See GCPack._check_references for why this is empty
 
184
 
 
185
    def _get_external_refs(self, index):
 
186
        # GC repositories don't have compression parents external to a given
 
187
        # pack file
 
188
        return set()
 
189
 
 
190
 
 
191
class GCCHKPacker(Packer):
 
192
    """This class understand what it takes to collect a GCCHK repo."""
 
193
 
 
194
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
 
195
                 reload_func=None):
 
196
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
 
197
                                          revision_ids=revision_ids,
 
198
                                          reload_func=reload_func)
 
199
        self._pack_collection = pack_collection
 
200
        # ATM, We only support this for GCCHK repositories
 
201
        if pack_collection.chk_index is None:
 
202
            raise AssertionError(
 
203
                'pack_collection.chk_index should not be None')
 
204
        self._gather_text_refs = False
 
205
        self._chk_id_roots = []
 
206
        self._chk_p_id_roots = []
 
207
        self._text_refs = None
 
208
        # set by .pack() if self.revision_ids is not None
 
209
        self.revision_keys = None
 
210
 
 
211
    def _get_progress_stream(self, source_vf, keys, message, pb):
 
212
        def pb_stream():
 
213
            substream = source_vf.get_record_stream(
 
214
                keys, 'groupcompress', True)
 
215
            for idx, record in enumerate(substream):
 
216
                if pb is not None:
 
217
                    pb.update(message, idx + 1, len(keys))
 
218
                yield record
 
219
        return pb_stream()
 
220
 
 
221
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
 
222
        """Filter the texts of inventories, to find the chk pages."""
 
223
        total_keys = len(keys)
 
224
 
 
225
        def _filtered_inv_stream():
 
226
            id_roots_set = set()
 
227
            p_id_roots_set = set()
 
228
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
229
            for idx, record in enumerate(stream):
 
230
                # Inventories should always be with revisions; assume success.
 
231
                lines = record.get_bytes_as('lines')
 
232
                chk_inv = inventory.CHKInventory.deserialise(
 
233
                    None, lines, record.key)
 
234
                if pb is not None:
 
235
                    pb.update('inv', idx, total_keys)
 
236
                key = chk_inv.id_to_entry.key()
 
237
                if key not in id_roots_set:
 
238
                    self._chk_id_roots.append(key)
 
239
                    id_roots_set.add(key)
 
240
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
241
                if p_id_map is None:
 
242
                    raise AssertionError('Parent id -> file_id map not set')
 
243
                key = p_id_map.key()
 
244
                if key not in p_id_roots_set:
 
245
                    p_id_roots_set.add(key)
 
246
                    self._chk_p_id_roots.append(key)
 
247
                yield record
 
248
            # We have finished processing all of the inventory records, we
 
249
            # don't need these sets anymore
 
250
            id_roots_set.clear()
 
251
            p_id_roots_set.clear()
 
252
        return _filtered_inv_stream()
 
253
 
 
254
    def _get_chk_streams(self, source_vf, keys, pb=None):
 
255
        # We want to stream the keys from 'id_roots', and things they
 
256
        # reference, and then stream things from p_id_roots and things they
 
257
        # reference, and then any remaining keys that we didn't get to.
 
258
 
 
259
        # We also group referenced texts together, so if one root references a
 
260
        # text with prefix 'a', and another root references a node with prefix
 
261
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
262
        # This keeps 'similar' nodes together.
 
263
 
 
264
        # Note: We probably actually want multiple streams here, to help the
 
265
        #       client understand that the different levels won't compress well
 
266
        #       against each other.
 
267
        #       Test the difference between using one Group per level, and
 
268
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
269
        #       all the references to search-key 'a' would get a group, etc.)
 
270
        total_keys = len(keys)
 
271
        remaining_keys = set(keys)
 
272
        counter = [0]
 
273
        if self._gather_text_refs:
 
274
            self._text_refs = set()
 
275
 
 
276
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
 
277
            cur_keys = root_keys
 
278
            while cur_keys:
 
279
                keys_by_search_prefix = {}
 
280
                remaining_keys.difference_update(cur_keys)
 
281
                next_keys = set()
 
282
 
 
283
                def handle_internal_node(node):
 
284
                    for prefix, value in node._items.items():
 
285
                        # We don't want to request the same key twice, and we
 
286
                        # want to order it by the first time it is seen.
 
287
                        # Even further, we don't want to request a key which is
 
288
                        # not in this group of pack files (it should be in the
 
289
                        # repo, but it doesn't have to be in the group being
 
290
                        # packed.)
 
291
                        # TODO: consider how to treat externally referenced chk
 
292
                        #       pages as 'external_references' so that we
 
293
                        #       always fill them in for stacked branches
 
294
                        if value not in next_keys and value in remaining_keys:
 
295
                            keys_by_search_prefix.setdefault(prefix,
 
296
                                                             []).append(value)
 
297
                            next_keys.add(value)
 
298
 
 
299
                def handle_leaf_node(node):
 
300
                    # Store is None, because we know we have a LeafNode, and we
 
301
                    # just want its entries
 
302
                    for file_id, bytes in node.iteritems(None):
 
303
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
 
304
 
 
305
                def next_stream():
 
306
                    stream = source_vf.get_record_stream(cur_keys,
 
307
                                                         'as-requested', True)
 
308
                    for record in stream:
 
309
                        if record.storage_kind == 'absent':
 
310
                            # An absent CHK record: we assume that the missing
 
311
                            # record is in a different pack - e.g. a page not
 
312
                            # altered by the commit we're packing.
 
313
                            continue
 
314
                        bytes = record.get_bytes_as('fulltext')
 
315
                        # We don't care about search_key_func for this code,
 
316
                        # because we only care about external references.
 
317
                        node = chk_map._deserialise(bytes, record.key,
 
318
                                                    search_key_func=None)
 
319
                        common_base = node._search_prefix
 
320
                        if isinstance(node, chk_map.InternalNode):
 
321
                            handle_internal_node(node)
 
322
                        elif parse_leaf_nodes:
 
323
                            handle_leaf_node(node)
 
324
                        counter[0] += 1
 
325
                        if pb is not None:
 
326
                            pb.update('chk node', counter[0], total_keys)
 
327
                        yield record
 
328
                yield next_stream()
 
329
                # Double check that we won't be emitting any keys twice
 
330
                # If we get rid of the pre-calculation of all keys, we could
 
331
                # turn this around and do
 
332
                # next_keys.difference_update(seen_keys)
 
333
                # However, we also may have references to chk pages in another
 
334
                # pack file during autopack. We filter earlier, so we should no
 
335
                # longer need to do this
 
336
                # next_keys = next_keys.intersection(remaining_keys)
 
337
                cur_keys = []
 
338
                for prefix in sorted(keys_by_search_prefix):
 
339
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
 
340
        for stream in _get_referenced_stream(self._chk_id_roots,
 
341
                                             self._gather_text_refs):
 
342
            yield stream
 
343
        del self._chk_id_roots
 
344
        # while it isn't really possible for chk_id_roots to not be in the
 
345
        # local group of packs, it is possible that the tree shape has not
 
346
        # changed recently, so we need to filter _chk_p_id_roots by the
 
347
        # available keys
 
348
        chk_p_id_roots = [key for key in self._chk_p_id_roots
 
349
                          if key in remaining_keys]
 
350
        del self._chk_p_id_roots
 
351
        for stream in _get_referenced_stream(chk_p_id_roots, False):
 
352
            yield stream
 
353
        if remaining_keys:
 
354
            trace.mutter('There were %d keys in the chk index, %d of which'
 
355
                         ' were not referenced', total_keys,
 
356
                         len(remaining_keys))
 
357
            if self.revision_ids is None:
 
358
                stream = source_vf.get_record_stream(remaining_keys,
 
359
                                                     'unordered', True)
 
360
                yield stream
 
361
 
 
362
    def _build_vf(self, index_name, parents, delta, for_write=False):
 
363
        """Build a VersionedFiles instance on top of this group of packs."""
 
364
        index_name = index_name + '_index'
 
365
        index_to_pack = {}
 
366
        access = _DirectPackAccess(index_to_pack,
 
367
                                   reload_func=self._reload_func)
 
368
        if for_write:
 
369
            # Use new_pack
 
370
            if self.new_pack is None:
 
371
                raise AssertionError('No new pack has been set')
 
372
            index = getattr(self.new_pack, index_name)
 
373
            index_to_pack[index] = self.new_pack.access_tuple()
 
374
            index.set_optimize(for_size=True)
 
375
            access.set_writer(self.new_pack._writer, index,
 
376
                              self.new_pack.access_tuple())
 
377
            add_callback = index.add_nodes
 
378
        else:
 
379
            indices = []
 
380
            for pack in self.packs:
 
381
                sub_index = getattr(pack, index_name)
 
382
                index_to_pack[sub_index] = pack.access_tuple()
 
383
                indices.append(sub_index)
 
384
            index = _mod_index.CombinedGraphIndex(indices)
 
385
            add_callback = None
 
386
        vf = GroupCompressVersionedFiles(
 
387
            _GCGraphIndex(index,
 
388
                          add_callback=add_callback,
 
389
                          parents=parents,
 
390
                          is_locked=self._pack_collection.repo.is_locked),
 
391
            access=access,
 
392
            delta=delta)
 
393
        return vf
 
394
 
 
395
    def _build_vfs(self, index_name, parents, delta):
 
396
        """Build the source and target VersionedFiles."""
 
397
        source_vf = self._build_vf(index_name, parents,
 
398
                                   delta, for_write=False)
 
399
        target_vf = self._build_vf(index_name, parents,
 
400
                                   delta, for_write=True)
 
401
        return source_vf, target_vf
 
402
 
 
403
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
404
                     pb_offset):
 
405
        trace.mutter('repacking %d %s', len(keys), message)
 
406
        self.pb.update('repacking %s' % (message,), pb_offset)
 
407
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
408
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
409
            for _, _ in target_vf._insert_record_stream(
 
410
                    stream, random_id=True, reuse_blocks=False):
 
411
                pass
 
412
 
 
413
    def _copy_revision_texts(self):
 
414
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
415
        if not self.revision_keys:
 
416
            # We are doing a full fetch, aka 'pack'
 
417
            self.revision_keys = source_vf.keys()
 
418
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
419
                          'revisions', self._get_progress_stream, 1)
 
420
 
 
421
    def _copy_inventory_texts(self):
 
422
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
423
        # It is not sufficient to just use self.revision_keys, as stacked
 
424
        # repositories can have more inventories than they have revisions.
 
425
        # One alternative would be to do something with
 
426
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
 
427
        # than this.
 
428
        inventory_keys = source_vf.keys()
 
429
        missing_inventories = set(
 
430
            self.revision_keys).difference(inventory_keys)
 
431
        if missing_inventories:
 
432
            # Go back to the original repo, to see if these are really missing
 
433
            # https://bugs.launchpad.net/bzr/+bug/437003
 
434
            # If we are packing a subset of the repo, it is fine to just have
 
435
            # the data in another Pack file, which is not included in this pack
 
436
            # operation.
 
437
            inv_index = self._pack_collection.repo.inventories._index
 
438
            pmap = inv_index.get_parent_map(missing_inventories)
 
439
            really_missing = missing_inventories.difference(pmap)
 
440
            if really_missing:
 
441
                missing_inventories = sorted(really_missing)
 
442
                raise ValueError('We are missing inventories for revisions: %s'
 
443
                                 % (missing_inventories,))
 
444
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
445
                          'inventories', self._get_filtered_inv_stream, 2)
 
446
 
 
447
    def _get_chk_vfs_for_copy(self):
 
448
        return self._build_vfs('chk', False, False)
 
449
 
 
450
    def _copy_chk_texts(self):
 
451
        source_vf, target_vf = self._get_chk_vfs_for_copy()
 
452
        # TODO: This is technically spurious... if it is a performance issue,
 
453
        #       remove it
 
454
        total_keys = source_vf.keys()
 
455
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
456
                     ' %d p_id_map roots, %d total keys',
 
457
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
458
                     len(total_keys))
 
459
        self.pb.update('repacking chk', 3)
 
460
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
461
            for stream in self._get_chk_streams(source_vf, total_keys,
 
462
                                                pb=child_pb):
 
463
                for _, _ in target_vf._insert_record_stream(
 
464
                        stream, random_id=True, reuse_blocks=False):
 
465
                    pass
 
466
 
 
467
    def _copy_text_texts(self):
 
468
        source_vf, target_vf = self._build_vfs('text', True, True)
 
469
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
470
        #      revision_id) keys.  We don't do it yet because you really need
 
471
        #      to filter out the ones that are present in the parents of the
 
472
        #      rev just before the ones you are copying, otherwise the filter
 
473
        #      is grabbing too many keys...
 
474
        text_keys = source_vf.keys()
 
475
        self._copy_stream(source_vf, target_vf, text_keys,
 
476
                          'texts', self._get_progress_stream, 4)
 
477
 
 
478
    def _copy_signature_texts(self):
 
479
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
480
        signature_keys = source_vf.keys()
 
481
        signature_keys.intersection(self.revision_keys)
 
482
        self._copy_stream(source_vf, target_vf, signature_keys,
 
483
                          'signatures', self._get_progress_stream, 5)
 
484
 
 
485
    def _create_pack_from_packs(self):
 
486
        self.pb.update('repacking', 0, 7)
 
487
        self.new_pack = self.open_pack()
 
488
        # Is this necessary for GC ?
 
489
        self.new_pack.set_write_cache_size(1024 * 1024)
 
490
        self._copy_revision_texts()
 
491
        self._copy_inventory_texts()
 
492
        self._copy_chk_texts()
 
493
        self._copy_text_texts()
 
494
        self._copy_signature_texts()
 
495
        self.new_pack._check_references()
 
496
        if not self._use_pack(self.new_pack):
 
497
            self.new_pack.abort()
 
498
            return None
 
499
        self.new_pack.finish_content()
 
500
        if len(self.packs) == 1:
 
501
            old_pack = self.packs[0]
 
502
            if old_pack.name == self.new_pack._hash.hexdigest():
 
503
                # The single old pack was already optimally packed.
 
504
                trace.mutter('single pack %s was already optimally packed',
 
505
                             old_pack.name)
 
506
                self.new_pack.abort()
 
507
                return None
 
508
        self.pb.update('finishing repack', 6, 7)
 
509
        self.new_pack.finish()
 
510
        self._pack_collection.allocate(self.new_pack)
 
511
        return self.new_pack
 
512
 
 
513
 
 
514
class GCCHKReconcilePacker(GCCHKPacker):
 
515
    """A packer which regenerates indices etc as it copies.
 
516
 
 
517
    This is used by ``brz reconcile`` to cause parent text pointers to be
 
518
    regenerated.
 
519
    """
 
520
 
 
521
    def __init__(self, *args, **kwargs):
 
522
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
 
523
        self._data_changed = False
 
524
        self._gather_text_refs = True
 
525
 
 
526
    def _copy_inventory_texts(self):
 
527
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
528
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
529
                          'inventories', self._get_filtered_inv_stream, 2)
 
530
        if source_vf.keys() != self.revision_keys:
 
531
            self._data_changed = True
 
532
 
 
533
    def _copy_text_texts(self):
 
534
        """generate what texts we should have and then copy."""
 
535
        source_vf, target_vf = self._build_vfs('text', True, True)
 
536
        trace.mutter('repacking %d texts', len(self._text_refs))
 
537
        self.pb.update("repacking texts", 4)
 
538
        # we have three major tasks here:
 
539
        # 1) generate the ideal index
 
540
        repo = self._pack_collection.repo
 
541
        # We want the one we just wrote, so base it on self.new_pack
 
542
        revision_vf = self._build_vf('revision', True, False, for_write=True)
 
543
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
 
544
        # Strip keys back into revision_ids.
 
545
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
 
546
                         for k, parents in ancestor_keys.items())
 
547
        del ancestor_keys
 
548
        # TODO: _generate_text_key_index should be much cheaper to generate from
 
549
        #       a chk repository, rather than the current implementation
 
550
        ideal_index = repo._generate_text_key_index(None, ancestors)
 
551
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
 
552
        # 2) generate a keys list that contains all the entries that can
 
553
        #    be used as-is, with corrected parents.
 
554
        ok_keys = []
 
555
        new_parent_keys = {}  # (key, parent_keys)
 
556
        discarded_keys = []
 
557
        NULL_REVISION = _mod_revision.NULL_REVISION
 
558
        for key in self._text_refs:
 
559
            # 0 - index
 
560
            # 1 - key
 
561
            # 2 - value
 
562
            # 3 - refs
 
563
            try:
 
564
                ideal_parents = tuple(ideal_index[key])
 
565
            except KeyError:
 
566
                discarded_keys.append(key)
 
567
                self._data_changed = True
 
568
            else:
 
569
                if ideal_parents == (NULL_REVISION,):
 
570
                    ideal_parents = ()
 
571
                source_parents = file_id_parent_map[key]
 
572
                if ideal_parents == source_parents:
 
573
                    # no change needed.
 
574
                    ok_keys.append(key)
 
575
                else:
 
576
                    # We need to change the parent graph, but we don't need to
 
577
                    # re-insert the text (since we don't pun the compression
 
578
                    # parent with the parents list)
 
579
                    self._data_changed = True
 
580
                    new_parent_keys[key] = ideal_parents
 
581
        # we're finished with some data.
 
582
        del ideal_index
 
583
        del file_id_parent_map
 
584
        # 3) bulk copy the data, updating records than need it
 
585
 
 
586
        def _update_parents_for_texts():
 
587
            stream = source_vf.get_record_stream(self._text_refs,
 
588
                                                 'groupcompress', False)
 
589
            for record in stream:
 
590
                if record.key in new_parent_keys:
 
591
                    record.parents = new_parent_keys[record.key]
 
592
                yield record
 
593
        target_vf.insert_record_stream(_update_parents_for_texts())
 
594
 
 
595
    def _use_pack(self, new_pack):
 
596
        """Override _use_pack to check for reconcile having changed content."""
 
597
        return new_pack.data_inserted() and self._data_changed
 
598
 
 
599
 
 
600
class GCCHKCanonicalizingPacker(GCCHKPacker):
 
601
    """A packer that ensures inventories have canonical-form CHK maps.
 
602
 
 
603
    Ideally this would be part of reconcile, but it's very slow and rarely
 
604
    needed.  (It repairs repositories affected by
 
605
    https://bugs.launchpad.net/bzr/+bug/522637).
 
606
    """
 
607
 
 
608
    def __init__(self, *args, **kwargs):
 
609
        super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
 
610
        self._data_changed = False
 
611
 
 
612
    def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
 
613
        """Create and exhaust a stream, but don't insert it.
 
614
 
 
615
        This is useful to get the side-effects of generating a stream.
 
616
        """
 
617
        self.pb.update('scanning %s' % (message,), pb_offset)
 
618
        with ui.ui_factory.nested_progress_bar() as child_pb:
 
619
            list(vf_to_stream(source_vf, keys, message, child_pb))
 
620
 
 
621
    def _copy_inventory_texts(self):
 
622
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
623
        source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
 
624
        inventory_keys = source_vf.keys()
 
625
        # First, copy the existing CHKs on the assumption that most of them
 
626
        # will be correct.  This will save us from having to reinsert (and
 
627
        # recompress) these records later at the cost of perhaps preserving a
 
628
        # few unused CHKs.
 
629
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
 
630
        # variables needed by GCCHKPacker._copy_chk_texts.)
 
631
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
 
632
                             self._get_filtered_inv_stream, 2)
 
633
        GCCHKPacker._copy_chk_texts(self)
 
634
        # Now copy and fix the inventories, and any regenerated CHKs.
 
635
 
 
636
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
 
637
            return self._get_filtered_canonicalizing_inv_stream(
 
638
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
 
639
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
640
                          'inventories', chk_canonicalizing_inv_stream, 4)
 
641
 
 
642
    def _copy_chk_texts(self):
 
643
        # No-op; in this class this happens during _copy_inventory_texts.
 
644
        pass
 
645
 
 
646
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
 
647
                                                pb=None, source_chk_vf=None, target_chk_vf=None):
 
648
        """Filter the texts of inventories, regenerating CHKs to make sure they
 
649
        are canonical.
 
650
        """
 
651
        total_keys = len(keys)
 
652
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
 
653
 
 
654
        def _filtered_inv_stream():
 
655
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
656
            search_key_name = None
 
657
            for idx, record in enumerate(stream):
 
658
                # Inventories should always be with revisions; assume success.
 
659
                lines = record.get_bytes_as('lines')
 
660
                chk_inv = inventory.CHKInventory.deserialise(
 
661
                    source_chk_vf, lines, record.key)
 
662
                if pb is not None:
 
663
                    pb.update('inv', idx, total_keys)
 
664
                chk_inv.id_to_entry._ensure_root()
 
665
                if search_key_name is None:
 
666
                    # Find the name corresponding to the search_key_func
 
667
                    search_key_reg = chk_map.search_key_registry
 
668
                    for search_key_name, func in search_key_reg.items():
 
669
                        if func == chk_inv.id_to_entry._search_key_func:
 
670
                            break
 
671
                canonical_inv = inventory.CHKInventory.from_inventory(
 
672
                    target_chk_vf, chk_inv,
 
673
                    maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
 
674
                    search_key_name=search_key_name)
 
675
                if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
 
676
                    trace.mutter(
 
677
                        'Non-canonical CHK map for id_to_entry of inv: %s '
 
678
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
 
679
                                                        chk_inv.id_to_entry.key()[
 
680
                                                            0],
 
681
                                                        canonical_inv.id_to_entry.key()[0]))
 
682
                    self._data_changed = True
 
683
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
684
                p_id_map._ensure_root()
 
685
                canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
 
686
                if p_id_map.key() != canon_p_id_map.key():
 
687
                    trace.mutter(
 
688
                        'Non-canonical CHK map for parent_id_to_basename of '
 
689
                        'inv: %s (root is %s, should be %s)'
 
690
                        % (chk_inv.revision_id, p_id_map.key()[0],
 
691
                           canon_p_id_map.key()[0]))
 
692
                    self._data_changed = True
 
693
                yield versionedfile.ChunkedContentFactory(
 
694
                    record.key, record.parents, record.sha1, canonical_inv.to_lines(),
 
695
                    chunks_are_lines=True)
 
696
            # We have finished processing all of the inventory records, we
 
697
            # don't need these sets anymore
 
698
        return _filtered_inv_stream()
 
699
 
 
700
    def _use_pack(self, new_pack):
 
701
        """Override _use_pack to check for reconcile having changed content."""
 
702
        return new_pack.data_inserted() and self._data_changed
 
703
 
 
704
 
 
705
class GCRepositoryPackCollection(RepositoryPackCollection):
 
706
 
 
707
    pack_factory = GCPack
 
708
    resumed_pack_factory = ResumedGCPack
 
709
    normal_packer_class = GCCHKPacker
 
710
    optimising_packer_class = GCCHKPacker
 
711
 
 
712
    def _check_new_inventories(self):
 
713
        """Detect missing inventories or chk root entries for the new revisions
 
714
        in this write group.
 
715
 
 
716
        :returns: list of strs, summarising any problems found.  If the list is
 
717
            empty no problems were found.
 
718
        """
 
719
        # Ensure that all revisions added in this write group have:
 
720
        #   - corresponding inventories,
 
721
        #   - chk root entries for those inventories,
 
722
        #   - and any present parent inventories have their chk root
 
723
        #     entries too.
 
724
        # And all this should be independent of any fallback repository.
 
725
        problems = []
 
726
        key_deps = self.repo.revisions._index._key_dependencies
 
727
        new_revisions_keys = key_deps.get_new_keys()
 
728
        no_fallback_inv_index = self.repo.inventories._index
 
729
        no_fallback_chk_bytes_index = self.repo.chk_bytes._index
 
730
        no_fallback_texts_index = self.repo.texts._index
 
731
        inv_parent_map = no_fallback_inv_index.get_parent_map(
 
732
            new_revisions_keys)
 
733
        # Are any inventories for corresponding to the new revisions missing?
 
734
        corresponding_invs = set(inv_parent_map)
 
735
        missing_corresponding = set(new_revisions_keys)
 
736
        missing_corresponding.difference_update(corresponding_invs)
 
737
        if missing_corresponding:
 
738
            problems.append("inventories missing for revisions %s" %
 
739
                            (sorted(missing_corresponding),))
 
740
            return problems
 
741
        # Are any chk root entries missing for any inventories?  This includes
 
742
        # any present parent inventories, which may be used when calculating
 
743
        # deltas for streaming.
 
744
        all_inv_keys = set(corresponding_invs)
 
745
        for parent_inv_keys in inv_parent_map.values():
 
746
            all_inv_keys.update(parent_inv_keys)
 
747
        # Filter out ghost parents.
 
748
        all_inv_keys.intersection_update(
 
749
            no_fallback_inv_index.get_parent_map(all_inv_keys))
 
750
        parent_invs_only_keys = all_inv_keys.symmetric_difference(
 
751
            corresponding_invs)
 
752
        all_missing = set()
 
753
        inv_ids = [key[-1] for key in all_inv_keys]
 
754
        parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
 
755
        root_key_info = _build_interesting_key_sets(
 
756
            self.repo, inv_ids, parent_invs_only_ids)
 
757
        expected_chk_roots = root_key_info.all_keys()
 
758
        present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
 
759
            expected_chk_roots)
 
760
        missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
 
761
        if missing_chk_roots:
 
762
            problems.append(
 
763
                "missing referenced chk root keys: %s."
 
764
                "Run 'brz reconcile --canonicalize-chks' on the affected "
 
765
                "repository."
 
766
                % (sorted(missing_chk_roots),))
 
767
            # Don't bother checking any further.
 
768
            return problems
 
769
        # Find all interesting chk_bytes records, and make sure they are
 
770
        # present, as well as the text keys they reference.
 
771
        chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
 
772
        chk_bytes_no_fallbacks._search_key_func = \
 
773
            self.repo.chk_bytes._search_key_func
 
774
        chk_diff = chk_map.iter_interesting_nodes(
 
775
            chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
 
776
            root_key_info.uninteresting_root_keys)
 
777
        text_keys = set()
 
778
        try:
 
779
            for record in _filter_text_keys(chk_diff, text_keys,
 
780
                                            chk_map._bytes_to_text_key):
 
781
                pass
 
782
        except errors.NoSuchRevision as e:
 
783
            # XXX: It would be nice if we could give a more precise error here.
 
784
            problems.append("missing chk node(s) for id_to_entry maps")
 
785
        chk_diff = chk_map.iter_interesting_nodes(
 
786
            chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
 
787
            root_key_info.uninteresting_pid_root_keys)
 
788
        try:
 
789
            for interesting_rec, interesting_map in chk_diff:
 
790
                pass
 
791
        except errors.NoSuchRevision as e:
 
792
            problems.append(
 
793
                "missing chk node(s) for parent_id_basename_to_file_id maps")
 
794
        present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
 
795
        missing_text_keys = text_keys.difference(present_text_keys)
 
796
        if missing_text_keys:
 
797
            problems.append("missing text keys: %r"
 
798
                            % (sorted(missing_text_keys),))
 
799
        return problems
 
800
 
 
801
 
 
802
class CHKInventoryRepository(PackRepository):
 
803
    """subclass of PackRepository that uses CHK based inventories."""
 
804
 
 
805
    def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
 
806
                 _serializer):
 
807
        """Overridden to change pack collection class."""
 
808
        super(CHKInventoryRepository, self).__init__(_format, a_controldir,
 
809
                                                     control_files, _commit_builder_class, _serializer)
 
810
        index_transport = self._transport.clone('indices')
 
811
        self._pack_collection = GCRepositoryPackCollection(self,
 
812
                                                           self._transport, index_transport,
 
813
                                                           self._transport.clone(
 
814
                                                               'upload'),
 
815
                                                           self._transport.clone(
 
816
                                                               'packs'),
 
817
                                                           _format.index_builder_class,
 
818
                                                           _format.index_class,
 
819
                                                           use_chk_index=self._format.supports_chks,
 
820
                                                           )
 
821
        self.inventories = GroupCompressVersionedFiles(
 
822
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
823
                          add_callback=self._pack_collection.inventory_index.add_callback,
 
824
                          parents=True, is_locked=self.is_locked,
 
825
                          inconsistency_fatal=False),
 
826
            access=self._pack_collection.inventory_index.data_access)
 
827
        self.revisions = GroupCompressVersionedFiles(
 
828
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
829
                          add_callback=self._pack_collection.revision_index.add_callback,
 
830
                          parents=True, is_locked=self.is_locked,
 
831
                          track_external_parent_refs=True, track_new_keys=True),
 
832
            access=self._pack_collection.revision_index.data_access,
 
833
            delta=False)
 
834
        self.signatures = GroupCompressVersionedFiles(
 
835
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
836
                          add_callback=self._pack_collection.signature_index.add_callback,
 
837
                          parents=False, is_locked=self.is_locked,
 
838
                          inconsistency_fatal=False),
 
839
            access=self._pack_collection.signature_index.data_access,
 
840
            delta=False)
 
841
        self.texts = GroupCompressVersionedFiles(
 
842
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
843
                          add_callback=self._pack_collection.text_index.add_callback,
 
844
                          parents=True, is_locked=self.is_locked,
 
845
                          inconsistency_fatal=False),
 
846
            access=self._pack_collection.text_index.data_access)
 
847
        # No parents, individual CHK pages don't have specific ancestry
 
848
        self.chk_bytes = GroupCompressVersionedFiles(
 
849
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
850
                          add_callback=self._pack_collection.chk_index.add_callback,
 
851
                          parents=False, is_locked=self.is_locked,
 
852
                          inconsistency_fatal=False),
 
853
            access=self._pack_collection.chk_index.data_access)
 
854
        search_key_name = self._format._serializer.search_key_name
 
855
        search_key_func = chk_map.search_key_registry.get(search_key_name)
 
856
        self.chk_bytes._search_key_func = search_key_func
 
857
        # True when the repository object is 'write locked' (as opposed to the
 
858
        # physical lock only taken out around changes to the pack-names list.)
 
859
        # Another way to represent this would be a decorator around the control
 
860
        # files object that presents logical locks as physical ones - if this
 
861
        # gets ugly consider that alternative design. RBC 20071011
 
862
        self._write_lock_count = 0
 
863
        self._transaction = None
 
864
        # for tests
 
865
        self._reconcile_does_inventory_gc = True
 
866
        self._reconcile_fixes_text_parents = True
 
867
        self._reconcile_backsup_inventory = False
 
868
 
 
869
    def _add_inventory_checked(self, revision_id, inv, parents):
 
870
        """Add inv to the repository after checking the inputs.
 
871
 
 
872
        This function can be overridden to allow different inventory styles.
 
873
 
 
874
        :seealso: add_inventory, for the contract.
 
875
        """
 
876
        # make inventory
 
877
        serializer = self._format._serializer
 
878
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
 
879
                                                       maximum_size=serializer.maximum_size,
 
880
                                                       search_key_name=serializer.search_key_name)
 
881
        inv_lines = result.to_lines()
 
882
        return self._inventory_add_lines(revision_id, parents,
 
883
                                         inv_lines, check_content=False)
 
884
 
 
885
    def _create_inv_from_null(self, delta, revision_id):
 
886
        """This will mutate new_inv directly.
 
887
 
 
888
        This is a simplified form of create_by_apply_delta which knows that all
 
889
        the old values must be None, so everything is a create.
 
890
        """
 
891
        serializer = self._format._serializer
 
892
        new_inv = inventory.CHKInventory(serializer.search_key_name)
 
893
        new_inv.revision_id = revision_id
 
894
        entry_to_bytes = new_inv._entry_to_bytes
 
895
        id_to_entry_dict = {}
 
896
        parent_id_basename_dict = {}
 
897
        for old_path, new_path, file_id, entry in delta:
 
898
            if old_path is not None:
 
899
                raise ValueError('Invalid delta, somebody tried to delete %r'
 
900
                                 ' from the NULL_REVISION'
 
901
                                 % ((old_path, file_id),))
 
902
            if new_path is None:
 
903
                raise ValueError('Invalid delta, delta from NULL_REVISION has'
 
904
                                 ' no new_path %r' % (file_id,))
 
905
            if new_path == '':
 
906
                new_inv.root_id = file_id
 
907
                parent_id_basename_key = StaticTuple(b'', b'').intern()
 
908
            else:
 
909
                utf8_entry_name = entry.name.encode('utf-8')
 
910
                parent_id_basename_key = StaticTuple(entry.parent_id,
 
911
                                                     utf8_entry_name).intern()
 
912
            new_value = entry_to_bytes(entry)
 
913
            # Populate Caches?
 
914
            # new_inv._path_to_fileid_cache[new_path] = file_id
 
915
            key = StaticTuple(file_id).intern()
 
916
            id_to_entry_dict[key] = new_value
 
917
            parent_id_basename_dict[parent_id_basename_key] = file_id
 
918
 
 
919
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
 
920
                                     parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
921
        return new_inv
 
922
 
 
923
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
 
924
                               parents, basis_inv=None, propagate_caches=False):
 
925
        """Add a new inventory expressed as a delta against another revision.
 
926
 
 
927
        :param basis_revision_id: The inventory id the delta was created
 
928
            against.
 
929
        :param delta: The inventory delta (see Inventory.apply_delta for
 
930
            details).
 
931
        :param new_revision_id: The revision id that the inventory is being
 
932
            added for.
 
933
        :param parents: The revision ids of the parents that revision_id is
 
934
            known to have and are in the repository already. These are supplied
 
935
            for repositories that depend on the inventory graph for revision
 
936
            graph access, as well as for those that pun ancestry with delta
 
937
            compression.
 
938
        :param basis_inv: The basis inventory if it is already known,
 
939
            otherwise None.
 
940
        :param propagate_caches: If True, the caches for this inventory are
 
941
          copied to and updated for the result if possible.
 
942
 
 
943
        :returns: (validator, new_inv)
 
944
            The validator(which is a sha1 digest, though what is sha'd is
 
945
            repository format specific) of the serialized inventory, and the
 
946
            resulting inventory.
 
947
        """
 
948
        if not self.is_in_write_group():
 
949
            raise AssertionError("%r not in write group" % (self,))
 
950
        _mod_revision.check_not_reserved_id(new_revision_id)
 
951
        basis_tree = None
 
952
        if basis_inv is None or not isinstance(basis_inv, inventory.CHKInventory):
 
953
            if basis_revision_id == _mod_revision.NULL_REVISION:
 
954
                new_inv = self._create_inv_from_null(delta, new_revision_id)
 
955
                if new_inv.root_id is None:
 
956
                    raise errors.RootMissing()
 
957
                inv_lines = new_inv.to_lines()
 
958
                return self._inventory_add_lines(new_revision_id, parents,
 
959
                                                 inv_lines, check_content=False), new_inv
 
960
            else:
 
961
                basis_tree = self.revision_tree(basis_revision_id)
 
962
                basis_tree.lock_read()
 
963
                basis_inv = basis_tree.root_inventory
 
964
        try:
 
965
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
 
966
                                                     propagate_caches=propagate_caches)
 
967
            inv_lines = result.to_lines()
 
968
            return self._inventory_add_lines(new_revision_id, parents,
 
969
                                             inv_lines, check_content=False), result
 
970
        finally:
 
971
            if basis_tree is not None:
 
972
                basis_tree.unlock()
 
973
 
 
974
    def _deserialise_inventory(self, revision_id, lines):
 
975
        return inventory.CHKInventory.deserialise(self.chk_bytes, lines,
 
976
                                                  (revision_id,))
 
977
 
 
978
    def _iter_inventories(self, revision_ids, ordering):
 
979
        """Iterate over many inventory objects."""
 
980
        if ordering is None:
 
981
            ordering = 'unordered'
 
982
        keys = [(revision_id,) for revision_id in revision_ids]
 
983
        stream = self.inventories.get_record_stream(keys, ordering, True)
 
984
        texts = {}
 
985
        for record in stream:
 
986
            if record.storage_kind != 'absent':
 
987
                texts[record.key] = record.get_bytes_as('lines')
 
988
            else:
 
989
                texts[record.key] = None
 
990
        for key in keys:
 
991
            lines = texts[key]
 
992
            if lines is None:
 
993
                yield (None, key[-1])
 
994
            else:
 
995
                yield (inventory.CHKInventory.deserialise(
 
996
                    self.chk_bytes, lines, key), key[-1])
 
997
 
 
998
    def _get_inventory_xml(self, revision_id):
 
999
        """Get serialized inventory as a string."""
 
1000
        # Without a native 'xml' inventory, this method doesn't make sense.
 
1001
        # However older working trees, and older bundles want it - so we supply
 
1002
        # it allowing _get_inventory_xml to work. Bundles currently use the
 
1003
        # serializer directly; this also isn't ideal, but there isn't an xml
 
1004
        # iteration interface offered at all for repositories.
 
1005
        return self._serializer.write_inventory_to_lines(
 
1006
            self.get_inventory(revision_id))
 
1007
 
 
1008
    def _find_present_inventory_keys(self, revision_keys):
 
1009
        parent_map = self.inventories.get_parent_map(revision_keys)
 
1010
        present_inventory_keys = set(k for k in parent_map)
 
1011
        return present_inventory_keys
 
1012
 
 
1013
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
 
1014
        """Find the file ids and versions affected by revisions.
 
1015
 
 
1016
        :param revisions: an iterable containing revision ids.
 
1017
        :param _inv_weave: The inventory weave from this repository or None.
 
1018
            If None, the inventory weave will be opened automatically.
 
1019
        :return: a dictionary mapping altered file-ids to an iterable of
 
1020
            revision_ids. Each altered file-ids has the exact revision_ids that
 
1021
            altered it listed explicitly.
 
1022
        """
 
1023
        rich_root = self.supports_rich_root()
 
1024
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
 
1025
        file_id_revisions = {}
 
1026
        with ui.ui_factory.nested_progress_bar() as pb:
 
1027
            revision_keys = [(r,) for r in revision_ids]
 
1028
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
 
1029
            # TODO: instead of using _find_present_inventory_keys, change the
 
1030
            #       code paths to allow missing inventories to be tolerated.
 
1031
            #       However, we only want to tolerate missing parent
 
1032
            #       inventories, not missing inventories for revision_ids
 
1033
            present_parent_inv_keys = self._find_present_inventory_keys(
 
1034
                parent_keys)
 
1035
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
 
1036
            inventories_to_read = set(revision_ids)
 
1037
            inventories_to_read.update(present_parent_inv_ids)
 
1038
            root_key_info = _build_interesting_key_sets(
 
1039
                self, inventories_to_read, present_parent_inv_ids)
 
1040
            interesting_root_keys = root_key_info.interesting_root_keys
 
1041
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
 
1042
            chk_bytes = self.chk_bytes
 
1043
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1044
                                                                interesting_root_keys, uninteresting_root_keys,
 
1045
                                                                pb=pb):
 
1046
                for name, bytes in items:
 
1047
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
 
1048
                    # TODO: consider interning file_id, revision_id here, or
 
1049
                    #       pushing that intern() into bytes_to_info()
 
1050
                    # TODO: rich_root should always be True here, for all
 
1051
                    #       repositories that support chk_bytes
 
1052
                    if not rich_root and name_utf8 == '':
 
1053
                        continue
 
1054
                    try:
 
1055
                        file_id_revisions[file_id].add(revision_id)
 
1056
                    except KeyError:
 
1057
                        file_id_revisions[file_id] = {revision_id}
 
1058
        return file_id_revisions
 
1059
 
 
1060
    def find_text_key_references(self):
 
1061
        """Find the text key references within the repository.
 
1062
 
 
1063
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
 
1064
            to whether they were referred to by the inventory of the
 
1065
            revision_id that they contain. The inventory texts from all present
 
1066
            revision ids are assessed to generate this report.
 
1067
        """
 
1068
        # XXX: Slow version but correct: rewrite as a series of delta
 
1069
        # examinations/direct tree traversal. Note that that will require care
 
1070
        # as a common node is reachable both from the inventory that added it,
 
1071
        # and others afterwards.
 
1072
        revision_keys = self.revisions.keys()
 
1073
        result = {}
 
1074
        rich_roots = self.supports_rich_root()
 
1075
        with ui.ui_factory.nested_progress_bar() as pb:
 
1076
            all_revs = self.all_revision_ids()
 
1077
            total = len(all_revs)
 
1078
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
 
1079
                pb.update("Finding text references", pos, total)
 
1080
                for _, entry in inv.iter_entries():
 
1081
                    if not rich_roots and entry.file_id == inv.root_id:
 
1082
                        continue
 
1083
                    key = (entry.file_id, entry.revision)
 
1084
                    result.setdefault(key, False)
 
1085
                    if entry.revision == inv.revision_id:
 
1086
                        result[key] = True
 
1087
            return result
 
1088
 
 
1089
    def reconcile_canonicalize_chks(self):
 
1090
        """Reconcile this repository to make sure all CHKs are in canonical
 
1091
        form.
 
1092
        """
 
1093
        from .reconcile import PackReconciler
 
1094
        with self.lock_write():
 
1095
            reconciler = PackReconciler(
 
1096
                self, thorough=True, canonicalize_chks=True)
 
1097
            return reconciler.reconcile()
 
1098
 
 
1099
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
1100
        packer = GCCHKReconcilePacker(collection, packs, extension)
 
1101
        return packer.pack(pb)
 
1102
 
 
1103
    def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
 
1104
        packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
 
1105
        return packer.pack(pb)
 
1106
 
 
1107
    def _get_source(self, to_format):
 
1108
        """Return a source for streaming from this repository."""
 
1109
        if self._format._serializer == to_format._serializer:
 
1110
            # We must be exactly the same format, otherwise stuff like the chk
 
1111
            # page layout might be different.
 
1112
            # Actually, this test is just slightly looser than exact so that
 
1113
            # CHK2 <-> 2a transfers will work.
 
1114
            return GroupCHKStreamSource(self, to_format)
 
1115
        return super(CHKInventoryRepository, self)._get_source(to_format)
 
1116
 
 
1117
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
 
1118
        """Find revisions with different parent lists in the revision object
 
1119
        and in the index graph.
 
1120
 
 
1121
        :param revisions_iterator: None, or an iterator of (revid,
 
1122
            Revision-or-None). This iterator controls the revisions checked.
 
1123
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
 
1124
            parents-in-revision).
 
1125
        """
 
1126
        if not self.is_locked():
 
1127
            raise AssertionError()
 
1128
        vf = self.revisions
 
1129
        if revisions_iterator is None:
 
1130
            revisions_iterator = self.iter_revisions(self.all_revision_ids())
 
1131
        for revid, revision in revisions_iterator:
 
1132
            if revision is None:
 
1133
                pass
 
1134
            parent_map = vf.get_parent_map([(revid,)])
 
1135
            parents_according_to_index = tuple(parent[-1] for parent in
 
1136
                                               parent_map[(revid,)])
 
1137
            parents_according_to_revision = tuple(revision.parent_ids)
 
1138
            if parents_according_to_index != parents_according_to_revision:
 
1139
                yield (revid, parents_according_to_index,
 
1140
                       parents_according_to_revision)
 
1141
 
 
1142
    def _check_for_inconsistent_revision_parents(self):
 
1143
        inconsistencies = list(self._find_inconsistent_revision_parents())
 
1144
        if inconsistencies:
 
1145
            raise errors.BzrCheckError(
 
1146
                "Revision index has inconsistent parents.")
 
1147
 
 
1148
 
 
1149
class GroupCHKStreamSource(StreamSource):
 
1150
    """Used when both the source and target repo are GroupCHK repos."""
 
1151
 
 
1152
    def __init__(self, from_repository, to_format):
 
1153
        """Create a StreamSource streaming from from_repository."""
 
1154
        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
 
1155
        self._revision_keys = None
 
1156
        self._text_keys = None
 
1157
        self._text_fetch_order = 'groupcompress'
 
1158
        self._chk_id_roots = None
 
1159
        self._chk_p_id_roots = None
 
1160
 
 
1161
    def _get_inventory_stream(self, inventory_keys, allow_absent=False):
 
1162
        """Get a stream of inventory texts.
 
1163
 
 
1164
        When this function returns, self._chk_id_roots and self._chk_p_id_roots
 
1165
        should be populated.
 
1166
        """
 
1167
        self._chk_id_roots = []
 
1168
        self._chk_p_id_roots = []
 
1169
 
 
1170
        def _filtered_inv_stream():
 
1171
            id_roots_set = set()
 
1172
            p_id_roots_set = set()
 
1173
            source_vf = self.from_repository.inventories
 
1174
            stream = source_vf.get_record_stream(inventory_keys,
 
1175
                                                 'groupcompress', True)
 
1176
            for record in stream:
 
1177
                if record.storage_kind == 'absent':
 
1178
                    if allow_absent:
 
1179
                        continue
 
1180
                    else:
 
1181
                        raise errors.NoSuchRevision(self, record.key)
 
1182
                lines = record.get_bytes_as('lines')
 
1183
                chk_inv = inventory.CHKInventory.deserialise(None, lines,
 
1184
                                                             record.key)
 
1185
                key = chk_inv.id_to_entry.key()
 
1186
                if key not in id_roots_set:
 
1187
                    self._chk_id_roots.append(key)
 
1188
                    id_roots_set.add(key)
 
1189
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
1190
                if p_id_map is None:
 
1191
                    raise AssertionError('Parent id -> file_id map not set')
 
1192
                key = p_id_map.key()
 
1193
                if key not in p_id_roots_set:
 
1194
                    p_id_roots_set.add(key)
 
1195
                    self._chk_p_id_roots.append(key)
 
1196
                yield record
 
1197
            # We have finished processing all of the inventory records, we
 
1198
            # don't need these sets anymore
 
1199
            id_roots_set.clear()
 
1200
            p_id_roots_set.clear()
 
1201
        return ('inventories', _filtered_inv_stream())
 
1202
 
 
1203
    def _get_filtered_chk_streams(self, excluded_revision_keys):
 
1204
        self._text_keys = set()
 
1205
        excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
 
1206
        if not excluded_revision_keys:
 
1207
            uninteresting_root_keys = set()
 
1208
            uninteresting_pid_root_keys = set()
 
1209
        else:
 
1210
            # filter out any excluded revisions whose inventories are not
 
1211
            # actually present
 
1212
            # TODO: Update Repository.iter_inventories() to add
 
1213
            #       ignore_missing=True
 
1214
            present_keys = self.from_repository._find_present_inventory_keys(
 
1215
                excluded_revision_keys)
 
1216
            present_ids = [k[-1] for k in present_keys]
 
1217
            uninteresting_root_keys = set()
 
1218
            uninteresting_pid_root_keys = set()
 
1219
            for inv in self.from_repository.iter_inventories(present_ids):
 
1220
                uninteresting_root_keys.add(inv.id_to_entry.key())
 
1221
                uninteresting_pid_root_keys.add(
 
1222
                    inv.parent_id_basename_to_file_id.key())
 
1223
        chk_bytes = self.from_repository.chk_bytes
 
1224
 
 
1225
        def _filter_id_to_entry():
 
1226
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
 
1227
                                                               self._chk_id_roots, uninteresting_root_keys)
 
1228
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
 
1229
                                            chk_map._bytes_to_text_key):
 
1230
                if record is not None:
 
1231
                    yield record
 
1232
            # Consumed
 
1233
            self._chk_id_roots = None
 
1234
        yield 'chk_bytes', _filter_id_to_entry()
 
1235
 
 
1236
        def _get_parent_id_basename_to_file_id_pages():
 
1237
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1238
                                                                self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1239
                if record is not None:
 
1240
                    yield record
 
1241
            # Consumed
 
1242
            self._chk_p_id_roots = None
 
1243
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
 
1244
 
 
1245
    def _get_text_stream(self):
 
1246
        # Note: We know we don't have to handle adding root keys, because both
 
1247
        # the source and target are the identical network name.
 
1248
        text_stream = self.from_repository.texts.get_record_stream(
 
1249
            self._text_keys, self._text_fetch_order, False)
 
1250
        return ('texts', text_stream)
 
1251
 
 
1252
    def get_stream(self, search):
 
1253
        def wrap_and_count(pb, rc, stream):
 
1254
            """Yield records from stream while showing progress."""
 
1255
            count = 0
 
1256
            for record in stream:
 
1257
                if count == rc.STEP:
 
1258
                    rc.increment(count)
 
1259
                    pb.update('Estimate', rc.current, rc.max)
 
1260
                    count = 0
 
1261
                count += 1
 
1262
                yield record
 
1263
 
 
1264
        revision_ids = search.get_keys()
 
1265
        with ui.ui_factory.nested_progress_bar() as pb:
 
1266
            rc = self._record_counter
 
1267
            self._record_counter.setup(len(revision_ids))
 
1268
            for stream_info in self._fetch_revision_texts(revision_ids):
 
1269
                yield (stream_info[0],
 
1270
                       wrap_and_count(pb, rc, stream_info[1]))
 
1271
            self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1272
            # TODO: The keys to exclude might be part of the search recipe
 
1273
            # For now, exclude all parents that are at the edge of ancestry, for
 
1274
            # which we have inventories
 
1275
            from_repo = self.from_repository
 
1276
            parent_keys = from_repo._find_parent_keys_of_revisions(
 
1277
                self._revision_keys)
 
1278
            self.from_repository.revisions.clear_cache()
 
1279
            self.from_repository.signatures.clear_cache()
 
1280
            # Clear the repo's get_parent_map cache too.
 
1281
            self.from_repository._unstacked_provider.disable_cache()
 
1282
            self.from_repository._unstacked_provider.enable_cache()
 
1283
            s = self._get_inventory_stream(self._revision_keys)
 
1284
            yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1285
            self.from_repository.inventories.clear_cache()
 
1286
            for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1287
                yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1288
            self.from_repository.chk_bytes.clear_cache()
 
1289
            s = self._get_text_stream()
 
1290
            yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1291
            self.from_repository.texts.clear_cache()
 
1292
            pb.update('Done', rc.max, rc.max)
 
1293
 
 
1294
    def get_stream_for_missing_keys(self, missing_keys):
 
1295
        # missing keys can only occur when we are byte copying and not
 
1296
        # translating (because translation means we don't send
 
1297
        # unreconstructable deltas ever).
 
1298
        missing_inventory_keys = set()
 
1299
        for key in missing_keys:
 
1300
            if key[0] != 'inventories':
 
1301
                raise AssertionError('The only missing keys we should'
 
1302
                                     ' be filling in are inventory keys, not %s'
 
1303
                                     % (key[0],))
 
1304
            missing_inventory_keys.add(key[1:])
 
1305
        if self._chk_id_roots or self._chk_p_id_roots:
 
1306
            raise AssertionError('Cannot call get_stream_for_missing_keys'
 
1307
                                 ' until all of get_stream() has been consumed.')
 
1308
        # Yield the inventory stream, so we can find the chk stream
 
1309
        # Some of the missing_keys will be missing because they are ghosts.
 
1310
        # As such, we can ignore them. The Sink is required to verify there are
 
1311
        # no unavailable texts when the ghost inventories are not filled in.
 
1312
        yield self._get_inventory_stream(missing_inventory_keys,
 
1313
                                         allow_absent=True)
 
1314
        # We use the empty set for excluded_revision_keys, to make it clear
 
1315
        # that we want to transmit all referenced chk pages.
 
1316
        for stream_info in self._get_filtered_chk_streams(set()):
 
1317
            yield stream_info
 
1318
 
 
1319
 
 
1320
class _InterestingKeyInfo(object):
 
1321
    def __init__(self):
 
1322
        self.interesting_root_keys = set()
 
1323
        self.interesting_pid_root_keys = set()
 
1324
        self.uninteresting_root_keys = set()
 
1325
        self.uninteresting_pid_root_keys = set()
 
1326
 
 
1327
    def all_interesting(self):
 
1328
        return self.interesting_root_keys.union(self.interesting_pid_root_keys)
 
1329
 
 
1330
    def all_uninteresting(self):
 
1331
        return self.uninteresting_root_keys.union(
 
1332
            self.uninteresting_pid_root_keys)
 
1333
 
 
1334
    def all_keys(self):
 
1335
        return self.all_interesting().union(self.all_uninteresting())
 
1336
 
 
1337
 
 
1338
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
 
1339
    result = _InterestingKeyInfo()
 
1340
    for inv in repo.iter_inventories(inventory_ids, 'unordered'):
 
1341
        root_key = inv.id_to_entry.key()
 
1342
        pid_root_key = inv.parent_id_basename_to_file_id.key()
 
1343
        if inv.revision_id in parent_only_inv_ids:
 
1344
            result.uninteresting_root_keys.add(root_key)
 
1345
            result.uninteresting_pid_root_keys.add(pid_root_key)
 
1346
        else:
 
1347
            result.interesting_root_keys.add(root_key)
 
1348
            result.interesting_pid_root_keys.add(pid_root_key)
 
1349
    return result
 
1350
 
 
1351
 
 
1352
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
 
1353
    """Iterate the result of iter_interesting_nodes, yielding the records
 
1354
    and adding to text_keys.
 
1355
    """
 
1356
    text_keys_update = text_keys.update
 
1357
    for record, items in interesting_nodes_iterable:
 
1358
        text_keys_update([bytes_to_text_key(b) for n, b in items])
 
1359
        yield record
 
1360
 
 
1361
 
 
1362
class RepositoryFormat2a(RepositoryFormatPack):
 
1363
    """A CHK repository that uses the bencode revision serializer."""
 
1364
 
 
1365
    repository_class = CHKInventoryRepository
 
1366
    supports_external_lookups = True
 
1367
    supports_chks = True
 
1368
    _commit_builder_class = PackCommitBuilder
 
1369
    rich_root_data = True
 
1370
    _serializer = chk_serializer.chk_bencode_serializer
 
1371
    _commit_inv_deltas = True
 
1372
    # What index classes to use
 
1373
    index_builder_class = BTreeBuilder
 
1374
    index_class = BTreeGraphIndex
 
1375
    # Note: We cannot unpack a delta that references a text we haven't
 
1376
    # seen yet. There are 2 options, work in fulltexts, or require
 
1377
    # topological sorting. Using fulltexts is more optimal for local
 
1378
    # operations, because the source can be smart about extracting
 
1379
    # multiple in-a-row (and sharing strings). Topological is better
 
1380
    # for remote, because we access less data.
 
1381
    _fetch_order = 'unordered'
 
1382
    # essentially ignored by the groupcompress code.
 
1383
    _fetch_uses_deltas = False
 
1384
    fast_deltas = True
 
1385
    pack_compresses = True
 
1386
    supports_tree_reference = True
 
1387
 
 
1388
    def _get_matching_bzrdir(self):
 
1389
        return controldir.format_registry.make_controldir('2a')
 
1390
 
 
1391
    def _ignore_setting_bzrdir(self, format):
 
1392
        pass
 
1393
 
 
1394
    _matchingcontroldir = property(
 
1395
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1396
 
 
1397
    @classmethod
 
1398
    def get_format_string(cls):
 
1399
        return b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'
 
1400
 
 
1401
    def get_format_description(self):
 
1402
        """See RepositoryFormat.get_format_description()."""
 
1403
        return ("Repository format 2a - rich roots, group compression"
 
1404
                " and chk inventories")
 
1405
 
 
1406
 
 
1407
class RepositoryFormat2aSubtree(RepositoryFormat2a):
 
1408
    """A 2a repository format that supports nested trees.
 
1409
 
 
1410
    """
 
1411
 
 
1412
    def _get_matching_bzrdir(self):
 
1413
        return controldir.format_registry.make_controldir('development-subtree')
 
1414
 
 
1415
    def _ignore_setting_bzrdir(self, format):
 
1416
        pass
 
1417
 
 
1418
    _matchingcontroldir = property(
 
1419
        _get_matching_bzrdir, _ignore_setting_bzrdir)
 
1420
 
 
1421
    @classmethod
 
1422
    def get_format_string(cls):
 
1423
        return b'Bazaar development format 8\n'
 
1424
 
 
1425
    def get_format_description(self):
 
1426
        """See RepositoryFormat.get_format_description()."""
 
1427
        return ("Development repository format 8 - nested trees, "
 
1428
                "group compression and chk inventories")
 
1429
 
 
1430
    experimental = True
 
1431
    supports_tree_reference = True