/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/repofmt/groupcompress_repo.py

  • Committer: Martin
  • Date: 2017-06-10 01:57:00 UTC
  • mto: This revision was merged to the branch mainline in revision 6679.
  • Revision ID: gzlist@googlemail.com-20170610015700-o3xeuyaqry2obiay
Go back to native str for urls and many other py3 changes

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Repository formats using CHK inventories and groupcompress compression."""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
import time
 
22
 
 
23
from .. import (
 
24
    controldir,
 
25
    chk_map,
 
26
    chk_serializer,
 
27
    debug,
 
28
    errors,
 
29
    index as _mod_index,
 
30
    inventory,
 
31
    osutils,
 
32
    pack,
 
33
    revision as _mod_revision,
 
34
    trace,
 
35
    ui,
 
36
    versionedfile,
 
37
    )
 
38
from ..btree_index import (
 
39
    BTreeGraphIndex,
 
40
    BTreeBuilder,
 
41
    )
 
42
from ..decorators import needs_write_lock
 
43
from ..groupcompress import (
 
44
    _GCGraphIndex,
 
45
    GroupCompressVersionedFiles,
 
46
    )
 
47
from .pack_repo import (
 
48
    _DirectPackAccess,
 
49
    Pack,
 
50
    NewPack,
 
51
    PackRepository,
 
52
    PackRootCommitBuilder,
 
53
    RepositoryPackCollection,
 
54
    RepositoryFormatPack,
 
55
    ResumedPack,
 
56
    Packer,
 
57
    )
 
58
from ..vf_repository import (
 
59
    StreamSource,
 
60
    )
 
61
from ..sixish import (
 
62
    viewitems,
 
63
    viewvalues,
 
64
    )
 
65
from ..static_tuple import StaticTuple
 
66
 
 
67
 
 
68
class GCPack(NewPack):
 
69
 
 
70
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
 
71
        """Create a NewPack instance.
 
72
 
 
73
        :param pack_collection: A PackCollection into which this is being
 
74
            inserted.
 
75
        :param upload_suffix: An optional suffix to be given to any temporary
 
76
            files created during the pack creation. e.g '.autopack'
 
77
        :param file_mode: An optional file mode to create the new files with.
 
78
        """
 
79
        # replaced from NewPack to:
 
80
        # - change inventory reference list length to 1
 
81
        # - change texts reference lists to 1
 
82
        # TODO: patch this to be parameterised
 
83
 
 
84
        # The relative locations of the packs are constrained, but all are
 
85
        # passed in because the caller has them, so as to avoid object churn.
 
86
        index_builder_class = pack_collection._index_builder_class
 
87
        # from brisbane-core
 
88
        if pack_collection.chk_index is not None:
 
89
            chk_index = index_builder_class(reference_lists=0)
 
90
        else:
 
91
            chk_index = None
 
92
        Pack.__init__(self,
 
93
            # Revisions: parents list, no text compression.
 
94
            index_builder_class(reference_lists=1),
 
95
            # Inventory: We want to map compression only, but currently the
 
96
            # knit code hasn't been updated enough to understand that, so we
 
97
            # have a regular 2-list index giving parents and compression
 
98
            # source.
 
99
            index_builder_class(reference_lists=1),
 
100
            # Texts: per file graph, for all fileids - so one reference list
 
101
            # and two elements in the key tuple.
 
102
            index_builder_class(reference_lists=1, key_elements=2),
 
103
            # Signatures: Just blobs to store, no compression, no parents
 
104
            # listing.
 
105
            index_builder_class(reference_lists=0),
 
106
            # CHK based storage - just blobs, no compression or parents.
 
107
            chk_index=chk_index
 
108
            )
 
109
        self._pack_collection = pack_collection
 
110
        # When we make readonly indices, we need this.
 
111
        self.index_class = pack_collection._index_class
 
112
        # where should the new pack be opened
 
113
        self.upload_transport = pack_collection._upload_transport
 
114
        # where are indices written out to
 
115
        self.index_transport = pack_collection._index_transport
 
116
        # where is the pack renamed to when it is finished?
 
117
        self.pack_transport = pack_collection._pack_transport
 
118
        # What file mode to upload the pack and indices with.
 
119
        self._file_mode = file_mode
 
120
        # tracks the content written to the .pack file.
 
121
        self._hash = osutils.md5()
 
122
        # a four-tuple with the length in bytes of the indices, once the pack
 
123
        # is finalised. (rev, inv, text, sigs)
 
124
        self.index_sizes = None
 
125
        # How much data to cache when writing packs. Note that this is not
 
126
        # synchronised with reads, because it's not in the transport layer, so
 
127
        # is not safe unless the client knows it won't be reading from the pack
 
128
        # under creation.
 
129
        self._cache_limit = 0
 
130
        # the temporary pack file name.
 
131
        self.random_name = osutils.rand_chars(20) + upload_suffix
 
132
        # when was this pack started ?
 
133
        self.start_time = time.time()
 
134
        # open an output stream for the data added to the pack.
 
135
        self.write_stream = self.upload_transport.open_write_stream(
 
136
            self.random_name, mode=self._file_mode)
 
137
        if 'pack' in debug.debug_flags:
 
138
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
 
139
                time.ctime(), self.upload_transport.base, self.random_name,
 
140
                time.time() - self.start_time)
 
141
        # A list of byte sequences to be written to the new pack, and the
 
142
        # aggregate size of them.  Stored as a list rather than separate
 
143
        # variables so that the _write_data closure below can update them.
 
144
        self._buffer = [[], 0]
 
145
        # create a callable for adding data
 
146
        #
 
147
        # robertc says- this is a closure rather than a method on the object
 
148
        # so that the variables are locals, and faster than accessing object
 
149
        # members.
 
150
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
151
            _write=self.write_stream.write, _update=self._hash.update):
 
152
            _buffer[0].append(bytes)
 
153
            _buffer[1] += len(bytes)
 
154
            # buffer cap
 
155
            if _buffer[1] > self._cache_limit or flush:
 
156
                bytes = ''.join(_buffer[0])
 
157
                _write(bytes)
 
158
                _update(bytes)
 
159
                _buffer[:] = [[], 0]
 
160
        # expose this on self, for the occasion when clients want to add data.
 
161
        self._write_data = _write_data
 
162
        # a pack writer object to serialise pack records.
 
163
        self._writer = pack.ContainerWriter(self._write_data)
 
164
        self._writer.begin()
 
165
        # what state is the pack in? (open, finished, aborted)
 
166
        self._state = 'open'
 
167
        # no name until we finish writing the content
 
168
        self.name = None
 
169
 
 
170
    def _check_references(self):
 
171
        """Make sure our external references are present.
 
172
 
 
173
        Packs are allowed to have deltas whose base is not in the pack, but it
 
174
        must be present somewhere in this collection.  It is not allowed to
 
175
        have deltas based on a fallback repository.
 
176
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
177
        """
 
178
        # Groupcompress packs don't have any external references, arguably CHK
 
179
        # pages have external references, but we cannot 'cheaply' determine
 
180
        # them without actually walking all of the chk pages.
 
181
 
 
182
 
 
183
class ResumedGCPack(ResumedPack):
 
184
 
 
185
    def _check_references(self):
 
186
        """Make sure our external compression parents are present."""
 
187
        # See GCPack._check_references for why this is empty
 
188
 
 
189
    def _get_external_refs(self, index):
 
190
        # GC repositories don't have compression parents external to a given
 
191
        # pack file
 
192
        return set()
 
193
 
 
194
 
 
195
class GCCHKPacker(Packer):
 
196
    """This class understand what it takes to collect a GCCHK repo."""
 
197
 
 
198
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
 
199
                 reload_func=None):
 
200
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
 
201
                                          revision_ids=revision_ids,
 
202
                                          reload_func=reload_func)
 
203
        self._pack_collection = pack_collection
 
204
        # ATM, We only support this for GCCHK repositories
 
205
        if pack_collection.chk_index is None:
 
206
            raise AssertionError('pack_collection.chk_index should not be None')
 
207
        self._gather_text_refs = False
 
208
        self._chk_id_roots = []
 
209
        self._chk_p_id_roots = []
 
210
        self._text_refs = None
 
211
        # set by .pack() if self.revision_ids is not None
 
212
        self.revision_keys = None
 
213
 
 
214
    def _get_progress_stream(self, source_vf, keys, message, pb):
 
215
        def pb_stream():
 
216
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
217
            for idx, record in enumerate(substream):
 
218
                if pb is not None:
 
219
                    pb.update(message, idx + 1, len(keys))
 
220
                yield record
 
221
        return pb_stream()
 
222
 
 
223
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
 
224
        """Filter the texts of inventories, to find the chk pages."""
 
225
        total_keys = len(keys)
 
226
        def _filtered_inv_stream():
 
227
            id_roots_set = set()
 
228
            p_id_roots_set = set()
 
229
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
230
            for idx, record in enumerate(stream):
 
231
                # Inventories should always be with revisions; assume success.
 
232
                bytes = record.get_bytes_as('fulltext')
 
233
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
234
                                                             record.key)
 
235
                if pb is not None:
 
236
                    pb.update('inv', idx, total_keys)
 
237
                key = chk_inv.id_to_entry.key()
 
238
                if key not in id_roots_set:
 
239
                    self._chk_id_roots.append(key)
 
240
                    id_roots_set.add(key)
 
241
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
242
                if p_id_map is None:
 
243
                    raise AssertionError('Parent id -> file_id map not set')
 
244
                key = p_id_map.key()
 
245
                if key not in p_id_roots_set:
 
246
                    p_id_roots_set.add(key)
 
247
                    self._chk_p_id_roots.append(key)
 
248
                yield record
 
249
            # We have finished processing all of the inventory records, we
 
250
            # don't need these sets anymore
 
251
            id_roots_set.clear()
 
252
            p_id_roots_set.clear()
 
253
        return _filtered_inv_stream()
 
254
 
 
255
    def _get_chk_streams(self, source_vf, keys, pb=None):
 
256
        # We want to stream the keys from 'id_roots', and things they
 
257
        # reference, and then stream things from p_id_roots and things they
 
258
        # reference, and then any remaining keys that we didn't get to.
 
259
 
 
260
        # We also group referenced texts together, so if one root references a
 
261
        # text with prefix 'a', and another root references a node with prefix
 
262
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
263
        # This keeps 'similar' nodes together.
 
264
 
 
265
        # Note: We probably actually want multiple streams here, to help the
 
266
        #       client understand that the different levels won't compress well
 
267
        #       against each other.
 
268
        #       Test the difference between using one Group per level, and
 
269
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
270
        #       all the references to search-key 'a' would get a group, etc.)
 
271
        total_keys = len(keys)
 
272
        remaining_keys = set(keys)
 
273
        counter = [0]
 
274
        if self._gather_text_refs:
 
275
            self._text_refs = set()
 
276
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
 
277
            cur_keys = root_keys
 
278
            while cur_keys:
 
279
                keys_by_search_prefix = {}
 
280
                remaining_keys.difference_update(cur_keys)
 
281
                next_keys = set()
 
282
                def handle_internal_node(node):
 
283
                    for prefix, value in viewitems(node._items):
 
284
                        # We don't want to request the same key twice, and we
 
285
                        # want to order it by the first time it is seen.
 
286
                        # Even further, we don't want to request a key which is
 
287
                        # not in this group of pack files (it should be in the
 
288
                        # repo, but it doesn't have to be in the group being
 
289
                        # packed.)
 
290
                        # TODO: consider how to treat externally referenced chk
 
291
                        #       pages as 'external_references' so that we
 
292
                        #       always fill them in for stacked branches
 
293
                        if value not in next_keys and value in remaining_keys:
 
294
                            keys_by_search_prefix.setdefault(prefix,
 
295
                                []).append(value)
 
296
                            next_keys.add(value)
 
297
                def handle_leaf_node(node):
 
298
                    # Store is None, because we know we have a LeafNode, and we
 
299
                    # just want its entries
 
300
                    for file_id, bytes in node.iteritems(None):
 
301
                        self._text_refs.add(chk_map._bytes_to_text_key(bytes))
 
302
                def next_stream():
 
303
                    stream = source_vf.get_record_stream(cur_keys,
 
304
                                                         'as-requested', True)
 
305
                    for record in stream:
 
306
                        if record.storage_kind == 'absent':
 
307
                            # An absent CHK record: we assume that the missing
 
308
                            # record is in a different pack - e.g. a page not
 
309
                            # altered by the commit we're packing.
 
310
                            continue
 
311
                        bytes = record.get_bytes_as('fulltext')
 
312
                        # We don't care about search_key_func for this code,
 
313
                        # because we only care about external references.
 
314
                        node = chk_map._deserialise(bytes, record.key,
 
315
                                                    search_key_func=None)
 
316
                        common_base = node._search_prefix
 
317
                        if isinstance(node, chk_map.InternalNode):
 
318
                            handle_internal_node(node)
 
319
                        elif parse_leaf_nodes:
 
320
                            handle_leaf_node(node)
 
321
                        counter[0] += 1
 
322
                        if pb is not None:
 
323
                            pb.update('chk node', counter[0], total_keys)
 
324
                        yield record
 
325
                yield next_stream()
 
326
                # Double check that we won't be emitting any keys twice
 
327
                # If we get rid of the pre-calculation of all keys, we could
 
328
                # turn this around and do
 
329
                # next_keys.difference_update(seen_keys)
 
330
                # However, we also may have references to chk pages in another
 
331
                # pack file during autopack. We filter earlier, so we should no
 
332
                # longer need to do this
 
333
                # next_keys = next_keys.intersection(remaining_keys)
 
334
                cur_keys = []
 
335
                for prefix in sorted(keys_by_search_prefix):
 
336
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
 
337
        for stream in _get_referenced_stream(self._chk_id_roots,
 
338
                                             self._gather_text_refs):
 
339
            yield stream
 
340
        del self._chk_id_roots
 
341
        # while it isn't really possible for chk_id_roots to not be in the
 
342
        # local group of packs, it is possible that the tree shape has not
 
343
        # changed recently, so we need to filter _chk_p_id_roots by the
 
344
        # available keys
 
345
        chk_p_id_roots = [key for key in self._chk_p_id_roots
 
346
                          if key in remaining_keys]
 
347
        del self._chk_p_id_roots
 
348
        for stream in _get_referenced_stream(chk_p_id_roots, False):
 
349
            yield stream
 
350
        if remaining_keys:
 
351
            trace.mutter('There were %d keys in the chk index, %d of which'
 
352
                         ' were not referenced', total_keys,
 
353
                         len(remaining_keys))
 
354
            if self.revision_ids is None:
 
355
                stream = source_vf.get_record_stream(remaining_keys,
 
356
                                                     'unordered', True)
 
357
                yield stream
 
358
 
 
359
    def _build_vf(self, index_name, parents, delta, for_write=False):
 
360
        """Build a VersionedFiles instance on top of this group of packs."""
 
361
        index_name = index_name + '_index'
 
362
        index_to_pack = {}
 
363
        access = _DirectPackAccess(index_to_pack,
 
364
                                   reload_func=self._reload_func)
 
365
        if for_write:
 
366
            # Use new_pack
 
367
            if self.new_pack is None:
 
368
                raise AssertionError('No new pack has been set')
 
369
            index = getattr(self.new_pack, index_name)
 
370
            index_to_pack[index] = self.new_pack.access_tuple()
 
371
            index.set_optimize(for_size=True)
 
372
            access.set_writer(self.new_pack._writer, index,
 
373
                              self.new_pack.access_tuple())
 
374
            add_callback = index.add_nodes
 
375
        else:
 
376
            indices = []
 
377
            for pack in self.packs:
 
378
                sub_index = getattr(pack, index_name)
 
379
                index_to_pack[sub_index] = pack.access_tuple()
 
380
                indices.append(sub_index)
 
381
            index = _mod_index.CombinedGraphIndex(indices)
 
382
            add_callback = None
 
383
        vf = GroupCompressVersionedFiles(
 
384
            _GCGraphIndex(index,
 
385
                          add_callback=add_callback,
 
386
                          parents=parents,
 
387
                          is_locked=self._pack_collection.repo.is_locked),
 
388
            access=access,
 
389
            delta=delta)
 
390
        return vf
 
391
 
 
392
    def _build_vfs(self, index_name, parents, delta):
 
393
        """Build the source and target VersionedFiles."""
 
394
        source_vf = self._build_vf(index_name, parents,
 
395
                                   delta, for_write=False)
 
396
        target_vf = self._build_vf(index_name, parents,
 
397
                                   delta, for_write=True)
 
398
        return source_vf, target_vf
 
399
 
 
400
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
401
                     pb_offset):
 
402
        trace.mutter('repacking %d %s', len(keys), message)
 
403
        self.pb.update('repacking %s' % (message,), pb_offset)
 
404
        child_pb = ui.ui_factory.nested_progress_bar()
 
405
        try:
 
406
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
407
            for _ in target_vf._insert_record_stream(stream,
 
408
                                                     random_id=True,
 
409
                                                     reuse_blocks=False):
 
410
                pass
 
411
        finally:
 
412
            child_pb.finished()
 
413
 
 
414
    def _copy_revision_texts(self):
 
415
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
416
        if not self.revision_keys:
 
417
            # We are doing a full fetch, aka 'pack'
 
418
            self.revision_keys = source_vf.keys()
 
419
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
420
                          'revisions', self._get_progress_stream, 1)
 
421
 
 
422
    def _copy_inventory_texts(self):
 
423
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
424
        # It is not sufficient to just use self.revision_keys, as stacked
 
425
        # repositories can have more inventories than they have revisions.
 
426
        # One alternative would be to do something with
 
427
        # get_parent_map(self.revision_keys), but that shouldn't be any faster
 
428
        # than this.
 
429
        inventory_keys = source_vf.keys()
 
430
        missing_inventories = set(self.revision_keys).difference(inventory_keys)
 
431
        if missing_inventories:
 
432
            # Go back to the original repo, to see if these are really missing
 
433
            # https://bugs.launchpad.net/bzr/+bug/437003
 
434
            # If we are packing a subset of the repo, it is fine to just have
 
435
            # the data in another Pack file, which is not included in this pack
 
436
            # operation.
 
437
            inv_index = self._pack_collection.repo.inventories._index
 
438
            pmap = inv_index.get_parent_map(missing_inventories)
 
439
            really_missing = missing_inventories.difference(pmap)
 
440
            if really_missing:
 
441
                missing_inventories = sorted(really_missing)
 
442
                raise ValueError('We are missing inventories for revisions: %s'
 
443
                    % (missing_inventories,))
 
444
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
445
                          'inventories', self._get_filtered_inv_stream, 2)
 
446
 
 
447
    def _get_chk_vfs_for_copy(self):
 
448
        return self._build_vfs('chk', False, False)
 
449
 
 
450
    def _copy_chk_texts(self):
 
451
        source_vf, target_vf = self._get_chk_vfs_for_copy()
 
452
        # TODO: This is technically spurious... if it is a performance issue,
 
453
        #       remove it
 
454
        total_keys = source_vf.keys()
 
455
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
456
                     ' %d p_id_map roots, %d total keys',
 
457
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
458
                     len(total_keys))
 
459
        self.pb.update('repacking chk', 3)
 
460
        child_pb = ui.ui_factory.nested_progress_bar()
 
461
        try:
 
462
            for stream in self._get_chk_streams(source_vf, total_keys,
 
463
                                                pb=child_pb):
 
464
                for _ in target_vf._insert_record_stream(stream,
 
465
                                                         random_id=True,
 
466
                                                         reuse_blocks=False):
 
467
                    pass
 
468
        finally:
 
469
            child_pb.finished()
 
470
 
 
471
    def _copy_text_texts(self):
 
472
        source_vf, target_vf = self._build_vfs('text', True, True)
 
473
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
474
        #      revision_id) keys.  We don't do it yet because you really need
 
475
        #      to filter out the ones that are present in the parents of the
 
476
        #      rev just before the ones you are copying, otherwise the filter
 
477
        #      is grabbing too many keys...
 
478
        text_keys = source_vf.keys()
 
479
        self._copy_stream(source_vf, target_vf, text_keys,
 
480
                          'texts', self._get_progress_stream, 4)
 
481
 
 
482
    def _copy_signature_texts(self):
 
483
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
484
        signature_keys = source_vf.keys()
 
485
        signature_keys.intersection(self.revision_keys)
 
486
        self._copy_stream(source_vf, target_vf, signature_keys,
 
487
                          'signatures', self._get_progress_stream, 5)
 
488
 
 
489
    def _create_pack_from_packs(self):
 
490
        self.pb.update('repacking', 0, 7)
 
491
        self.new_pack = self.open_pack()
 
492
        # Is this necessary for GC ?
 
493
        self.new_pack.set_write_cache_size(1024*1024)
 
494
        self._copy_revision_texts()
 
495
        self._copy_inventory_texts()
 
496
        self._copy_chk_texts()
 
497
        self._copy_text_texts()
 
498
        self._copy_signature_texts()
 
499
        self.new_pack._check_references()
 
500
        if not self._use_pack(self.new_pack):
 
501
            self.new_pack.abort()
 
502
            return None
 
503
        self.new_pack.finish_content()
 
504
        if len(self.packs) == 1:
 
505
            old_pack = self.packs[0]
 
506
            if old_pack.name == self.new_pack._hash.hexdigest():
 
507
                # The single old pack was already optimally packed.
 
508
                trace.mutter('single pack %s was already optimally packed',
 
509
                    old_pack.name)
 
510
                self.new_pack.abort()
 
511
                return None
 
512
        self.pb.update('finishing repack', 6, 7)
 
513
        self.new_pack.finish()
 
514
        self._pack_collection.allocate(self.new_pack)
 
515
        return self.new_pack
 
516
 
 
517
 
 
518
class GCCHKReconcilePacker(GCCHKPacker):
 
519
    """A packer which regenerates indices etc as it copies.
 
520
 
 
521
    This is used by ``brz reconcile`` to cause parent text pointers to be
 
522
    regenerated.
 
523
    """
 
524
 
 
525
    def __init__(self, *args, **kwargs):
 
526
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
 
527
        self._data_changed = False
 
528
        self._gather_text_refs = True
 
529
 
 
530
    def _copy_inventory_texts(self):
 
531
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
532
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
533
                          'inventories', self._get_filtered_inv_stream, 2)
 
534
        if source_vf.keys() != self.revision_keys:
 
535
            self._data_changed = True
 
536
 
 
537
    def _copy_text_texts(self):
 
538
        """generate what texts we should have and then copy."""
 
539
        source_vf, target_vf = self._build_vfs('text', True, True)
 
540
        trace.mutter('repacking %d texts', len(self._text_refs))
 
541
        self.pb.update("repacking texts", 4)
 
542
        # we have three major tasks here:
 
543
        # 1) generate the ideal index
 
544
        repo = self._pack_collection.repo
 
545
        # We want the one we just wrote, so base it on self.new_pack
 
546
        revision_vf = self._build_vf('revision', True, False, for_write=True)
 
547
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
 
548
        # Strip keys back into revision_ids.
 
549
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
 
550
                         for k, parents in viewitems(ancestor_keys))
 
551
        del ancestor_keys
 
552
        # TODO: _generate_text_key_index should be much cheaper to generate from
 
553
        #       a chk repository, rather than the current implementation
 
554
        ideal_index = repo._generate_text_key_index(None, ancestors)
 
555
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
 
556
        # 2) generate a keys list that contains all the entries that can
 
557
        #    be used as-is, with corrected parents.
 
558
        ok_keys = []
 
559
        new_parent_keys = {} # (key, parent_keys)
 
560
        discarded_keys = []
 
561
        NULL_REVISION = _mod_revision.NULL_REVISION
 
562
        for key in self._text_refs:
 
563
            # 0 - index
 
564
            # 1 - key
 
565
            # 2 - value
 
566
            # 3 - refs
 
567
            try:
 
568
                ideal_parents = tuple(ideal_index[key])
 
569
            except KeyError:
 
570
                discarded_keys.append(key)
 
571
                self._data_changed = True
 
572
            else:
 
573
                if ideal_parents == (NULL_REVISION,):
 
574
                    ideal_parents = ()
 
575
                source_parents = file_id_parent_map[key]
 
576
                if ideal_parents == source_parents:
 
577
                    # no change needed.
 
578
                    ok_keys.append(key)
 
579
                else:
 
580
                    # We need to change the parent graph, but we don't need to
 
581
                    # re-insert the text (since we don't pun the compression
 
582
                    # parent with the parents list)
 
583
                    self._data_changed = True
 
584
                    new_parent_keys[key] = ideal_parents
 
585
        # we're finished with some data.
 
586
        del ideal_index
 
587
        del file_id_parent_map
 
588
        # 3) bulk copy the data, updating records than need it
 
589
        def _update_parents_for_texts():
 
590
            stream = source_vf.get_record_stream(self._text_refs,
 
591
                'groupcompress', False)
 
592
            for record in stream:
 
593
                if record.key in new_parent_keys:
 
594
                    record.parents = new_parent_keys[record.key]
 
595
                yield record
 
596
        target_vf.insert_record_stream(_update_parents_for_texts())
 
597
 
 
598
    def _use_pack(self, new_pack):
 
599
        """Override _use_pack to check for reconcile having changed content."""
 
600
        return new_pack.data_inserted() and self._data_changed
 
601
 
 
602
 
 
603
class GCCHKCanonicalizingPacker(GCCHKPacker):
 
604
    """A packer that ensures inventories have canonical-form CHK maps.
 
605
    
 
606
    Ideally this would be part of reconcile, but it's very slow and rarely
 
607
    needed.  (It repairs repositories affected by
 
608
    https://bugs.launchpad.net/bzr/+bug/522637).
 
609
    """
 
610
 
 
611
    def __init__(self, *args, **kwargs):
 
612
        super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
 
613
        self._data_changed = False
 
614
 
 
615
    def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
 
616
        """Create and exhaust a stream, but don't insert it.
 
617
 
 
618
        This is useful to get the side-effects of generating a stream.
 
619
        """
 
620
        self.pb.update('scanning %s' % (message,), pb_offset)
 
621
        child_pb = ui.ui_factory.nested_progress_bar()
 
622
        try:
 
623
            list(vf_to_stream(source_vf, keys, message, child_pb))
 
624
        finally:
 
625
            child_pb.finished()
 
626
 
 
627
    def _copy_inventory_texts(self):
 
628
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
629
        source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
 
630
        inventory_keys = source_vf.keys()
 
631
        # First, copy the existing CHKs on the assumption that most of them
 
632
        # will be correct.  This will save us from having to reinsert (and
 
633
        # recompress) these records later at the cost of perhaps preserving a
 
634
        # few unused CHKs. 
 
635
        # (Iterate but don't insert _get_filtered_inv_stream to populate the
 
636
        # variables needed by GCCHKPacker._copy_chk_texts.)
 
637
        self._exhaust_stream(source_vf, inventory_keys, 'inventories',
 
638
                self._get_filtered_inv_stream, 2)
 
639
        GCCHKPacker._copy_chk_texts(self)
 
640
        # Now copy and fix the inventories, and any regenerated CHKs.
 
641
        def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
 
642
            return self._get_filtered_canonicalizing_inv_stream(
 
643
                source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
 
644
        self._copy_stream(source_vf, target_vf, inventory_keys,
 
645
                          'inventories', chk_canonicalizing_inv_stream, 4)
 
646
 
 
647
    def _copy_chk_texts(self):
 
648
        # No-op; in this class this happens during _copy_inventory_texts.
 
649
        pass
 
650
 
 
651
    def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
 
652
            pb=None, source_chk_vf=None, target_chk_vf=None):
 
653
        """Filter the texts of inventories, regenerating CHKs to make sure they
 
654
        are canonical.
 
655
        """
 
656
        total_keys = len(keys)
 
657
        target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
 
658
        def _filtered_inv_stream():
 
659
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
660
            search_key_name = None
 
661
            for idx, record in enumerate(stream):
 
662
                # Inventories should always be with revisions; assume success.
 
663
                bytes = record.get_bytes_as('fulltext')
 
664
                chk_inv = inventory.CHKInventory.deserialise(
 
665
                    source_chk_vf, bytes, record.key)
 
666
                if pb is not None:
 
667
                    pb.update('inv', idx, total_keys)
 
668
                chk_inv.id_to_entry._ensure_root()
 
669
                if search_key_name is None:
 
670
                    # Find the name corresponding to the search_key_func
 
671
                    search_key_reg = chk_map.search_key_registry
 
672
                    for search_key_name, func in viewitems(search_key_reg):
 
673
                        if func == chk_inv.id_to_entry._search_key_func:
 
674
                            break
 
675
                canonical_inv = inventory.CHKInventory.from_inventory(
 
676
                    target_chk_vf, chk_inv,
 
677
                    maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
 
678
                    search_key_name=search_key_name)
 
679
                if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
 
680
                    trace.mutter(
 
681
                        'Non-canonical CHK map for id_to_entry of inv: %s '
 
682
                        '(root is %s, should be %s)' % (chk_inv.revision_id,
 
683
                        chk_inv.id_to_entry.key()[0],
 
684
                        canonical_inv.id_to_entry.key()[0]))
 
685
                    self._data_changed = True
 
686
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
687
                p_id_map._ensure_root()
 
688
                canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
 
689
                if p_id_map.key() != canon_p_id_map.key():
 
690
                    trace.mutter(
 
691
                        'Non-canonical CHK map for parent_id_to_basename of '
 
692
                        'inv: %s (root is %s, should be %s)'
 
693
                        % (chk_inv.revision_id, p_id_map.key()[0],
 
694
                           canon_p_id_map.key()[0]))
 
695
                    self._data_changed = True
 
696
                yield versionedfile.ChunkedContentFactory(record.key,
 
697
                        record.parents, record.sha1,
 
698
                        canonical_inv.to_lines())
 
699
            # We have finished processing all of the inventory records, we
 
700
            # don't need these sets anymore
 
701
        return _filtered_inv_stream()
 
702
 
 
703
    def _use_pack(self, new_pack):
 
704
        """Override _use_pack to check for reconcile having changed content."""
 
705
        return new_pack.data_inserted() and self._data_changed
 
706
 
 
707
 
 
708
class GCRepositoryPackCollection(RepositoryPackCollection):
 
709
 
 
710
    pack_factory = GCPack
 
711
    resumed_pack_factory = ResumedGCPack
 
712
    normal_packer_class = GCCHKPacker
 
713
    optimising_packer_class = GCCHKPacker
 
714
 
 
715
    def _check_new_inventories(self):
 
716
        """Detect missing inventories or chk root entries for the new revisions
 
717
        in this write group.
 
718
 
 
719
        :returns: list of strs, summarising any problems found.  If the list is
 
720
            empty no problems were found.
 
721
        """
 
722
        # Ensure that all revisions added in this write group have:
 
723
        #   - corresponding inventories,
 
724
        #   - chk root entries for those inventories,
 
725
        #   - and any present parent inventories have their chk root
 
726
        #     entries too.
 
727
        # And all this should be independent of any fallback repository.
 
728
        problems = []
 
729
        key_deps = self.repo.revisions._index._key_dependencies
 
730
        new_revisions_keys = key_deps.get_new_keys()
 
731
        no_fallback_inv_index = self.repo.inventories._index
 
732
        no_fallback_chk_bytes_index = self.repo.chk_bytes._index
 
733
        no_fallback_texts_index = self.repo.texts._index
 
734
        inv_parent_map = no_fallback_inv_index.get_parent_map(
 
735
            new_revisions_keys)
 
736
        # Are any inventories for corresponding to the new revisions missing?
 
737
        corresponding_invs = set(inv_parent_map)
 
738
        missing_corresponding = set(new_revisions_keys)
 
739
        missing_corresponding.difference_update(corresponding_invs)
 
740
        if missing_corresponding:
 
741
            problems.append("inventories missing for revisions %s" %
 
742
                (sorted(missing_corresponding),))
 
743
            return problems
 
744
        # Are any chk root entries missing for any inventories?  This includes
 
745
        # any present parent inventories, which may be used when calculating
 
746
        # deltas for streaming.
 
747
        all_inv_keys = set(corresponding_invs)
 
748
        for parent_inv_keys in viewvalues(inv_parent_map):
 
749
            all_inv_keys.update(parent_inv_keys)
 
750
        # Filter out ghost parents.
 
751
        all_inv_keys.intersection_update(
 
752
            no_fallback_inv_index.get_parent_map(all_inv_keys))
 
753
        parent_invs_only_keys = all_inv_keys.symmetric_difference(
 
754
            corresponding_invs)
 
755
        all_missing = set()
 
756
        inv_ids = [key[-1] for key in all_inv_keys]
 
757
        parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
 
758
        root_key_info = _build_interesting_key_sets(
 
759
            self.repo, inv_ids, parent_invs_only_ids)
 
760
        expected_chk_roots = root_key_info.all_keys()
 
761
        present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
 
762
            expected_chk_roots)
 
763
        missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
 
764
        if missing_chk_roots:
 
765
            problems.append(
 
766
                "missing referenced chk root keys: %s."
 
767
                "Run 'brz reconcile --canonicalize-chks' on the affected "
 
768
                "repository."
 
769
                % (sorted(missing_chk_roots),))
 
770
            # Don't bother checking any further.
 
771
            return problems
 
772
        # Find all interesting chk_bytes records, and make sure they are
 
773
        # present, as well as the text keys they reference.
 
774
        chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
 
775
        chk_bytes_no_fallbacks._search_key_func = \
 
776
            self.repo.chk_bytes._search_key_func
 
777
        chk_diff = chk_map.iter_interesting_nodes(
 
778
            chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
 
779
            root_key_info.uninteresting_root_keys)
 
780
        text_keys = set()
 
781
        try:
 
782
            for record in _filter_text_keys(chk_diff, text_keys,
 
783
                                            chk_map._bytes_to_text_key):
 
784
                pass
 
785
        except errors.NoSuchRevision as e:
 
786
            # XXX: It would be nice if we could give a more precise error here.
 
787
            problems.append("missing chk node(s) for id_to_entry maps")
 
788
        chk_diff = chk_map.iter_interesting_nodes(
 
789
            chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
 
790
            root_key_info.uninteresting_pid_root_keys)
 
791
        try:
 
792
            for interesting_rec, interesting_map in chk_diff:
 
793
                pass
 
794
        except errors.NoSuchRevision as e:
 
795
            problems.append(
 
796
                "missing chk node(s) for parent_id_basename_to_file_id maps")
 
797
        present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
 
798
        missing_text_keys = text_keys.difference(present_text_keys)
 
799
        if missing_text_keys:
 
800
            problems.append("missing text keys: %r"
 
801
                % (sorted(missing_text_keys),))
 
802
        return problems
 
803
 
 
804
 
 
805
class CHKInventoryRepository(PackRepository):
 
806
    """subclass of PackRepository that uses CHK based inventories."""
 
807
 
 
808
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
809
        _serializer):
 
810
        """Overridden to change pack collection class."""
 
811
        super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
 
812
            control_files, _commit_builder_class, _serializer)
 
813
        index_transport = self._transport.clone('indices')
 
814
        self._pack_collection = GCRepositoryPackCollection(self,
 
815
            self._transport, index_transport,
 
816
            self._transport.clone('upload'),
 
817
            self._transport.clone('packs'),
 
818
            _format.index_builder_class,
 
819
            _format.index_class,
 
820
            use_chk_index=self._format.supports_chks,
 
821
            )
 
822
        self.inventories = GroupCompressVersionedFiles(
 
823
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
824
                add_callback=self._pack_collection.inventory_index.add_callback,
 
825
                parents=True, is_locked=self.is_locked,
 
826
                inconsistency_fatal=False),
 
827
            access=self._pack_collection.inventory_index.data_access)
 
828
        self.revisions = GroupCompressVersionedFiles(
 
829
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
830
                add_callback=self._pack_collection.revision_index.add_callback,
 
831
                parents=True, is_locked=self.is_locked,
 
832
                track_external_parent_refs=True, track_new_keys=True),
 
833
            access=self._pack_collection.revision_index.data_access,
 
834
            delta=False)
 
835
        self.signatures = GroupCompressVersionedFiles(
 
836
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
837
                add_callback=self._pack_collection.signature_index.add_callback,
 
838
                parents=False, is_locked=self.is_locked,
 
839
                inconsistency_fatal=False),
 
840
            access=self._pack_collection.signature_index.data_access,
 
841
            delta=False)
 
842
        self.texts = GroupCompressVersionedFiles(
 
843
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
844
                add_callback=self._pack_collection.text_index.add_callback,
 
845
                parents=True, is_locked=self.is_locked,
 
846
                inconsistency_fatal=False),
 
847
            access=self._pack_collection.text_index.data_access)
 
848
        # No parents, individual CHK pages don't have specific ancestry
 
849
        self.chk_bytes = GroupCompressVersionedFiles(
 
850
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
851
                add_callback=self._pack_collection.chk_index.add_callback,
 
852
                parents=False, is_locked=self.is_locked,
 
853
                inconsistency_fatal=False),
 
854
            access=self._pack_collection.chk_index.data_access)
 
855
        search_key_name = self._format._serializer.search_key_name
 
856
        search_key_func = chk_map.search_key_registry.get(search_key_name)
 
857
        self.chk_bytes._search_key_func = search_key_func
 
858
        # True when the repository object is 'write locked' (as opposed to the
 
859
        # physical lock only taken out around changes to the pack-names list.)
 
860
        # Another way to represent this would be a decorator around the control
 
861
        # files object that presents logical locks as physical ones - if this
 
862
        # gets ugly consider that alternative design. RBC 20071011
 
863
        self._write_lock_count = 0
 
864
        self._transaction = None
 
865
        # for tests
 
866
        self._reconcile_does_inventory_gc = True
 
867
        self._reconcile_fixes_text_parents = True
 
868
        self._reconcile_backsup_inventory = False
 
869
 
 
870
    def _add_inventory_checked(self, revision_id, inv, parents):
 
871
        """Add inv to the repository after checking the inputs.
 
872
 
 
873
        This function can be overridden to allow different inventory styles.
 
874
 
 
875
        :seealso: add_inventory, for the contract.
 
876
        """
 
877
        # make inventory
 
878
        serializer = self._format._serializer
 
879
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
 
880
            maximum_size=serializer.maximum_size,
 
881
            search_key_name=serializer.search_key_name)
 
882
        inv_lines = result.to_lines()
 
883
        return self._inventory_add_lines(revision_id, parents,
 
884
            inv_lines, check_content=False)
 
885
 
 
886
    def _create_inv_from_null(self, delta, revision_id):
 
887
        """This will mutate new_inv directly.
 
888
 
 
889
        This is a simplified form of create_by_apply_delta which knows that all
 
890
        the old values must be None, so everything is a create.
 
891
        """
 
892
        serializer = self._format._serializer
 
893
        new_inv = inventory.CHKInventory(serializer.search_key_name)
 
894
        new_inv.revision_id = revision_id
 
895
        entry_to_bytes = new_inv._entry_to_bytes
 
896
        id_to_entry_dict = {}
 
897
        parent_id_basename_dict = {}
 
898
        for old_path, new_path, file_id, entry in delta:
 
899
            if old_path is not None:
 
900
                raise ValueError('Invalid delta, somebody tried to delete %r'
 
901
                                 ' from the NULL_REVISION'
 
902
                                 % ((old_path, file_id),))
 
903
            if new_path is None:
 
904
                raise ValueError('Invalid delta, delta from NULL_REVISION has'
 
905
                                 ' no new_path %r' % (file_id,))
 
906
            if new_path == '':
 
907
                new_inv.root_id = file_id
 
908
                parent_id_basename_key = StaticTuple('', '').intern()
 
909
            else:
 
910
                utf8_entry_name = entry.name.encode('utf-8')
 
911
                parent_id_basename_key = StaticTuple(entry.parent_id,
 
912
                                                     utf8_entry_name).intern()
 
913
            new_value = entry_to_bytes(entry)
 
914
            # Populate Caches?
 
915
            # new_inv._path_to_fileid_cache[new_path] = file_id
 
916
            key = StaticTuple(file_id).intern()
 
917
            id_to_entry_dict[key] = new_value
 
918
            parent_id_basename_dict[parent_id_basename_key] = file_id
 
919
 
 
920
        new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
 
921
            parent_id_basename_dict, maximum_size=serializer.maximum_size)
 
922
        return new_inv
 
923
 
 
924
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
 
925
                               parents, basis_inv=None, propagate_caches=False):
 
926
        """Add a new inventory expressed as a delta against another revision.
 
927
 
 
928
        :param basis_revision_id: The inventory id the delta was created
 
929
            against.
 
930
        :param delta: The inventory delta (see Inventory.apply_delta for
 
931
            details).
 
932
        :param new_revision_id: The revision id that the inventory is being
 
933
            added for.
 
934
        :param parents: The revision ids of the parents that revision_id is
 
935
            known to have and are in the repository already. These are supplied
 
936
            for repositories that depend on the inventory graph for revision
 
937
            graph access, as well as for those that pun ancestry with delta
 
938
            compression.
 
939
        :param basis_inv: The basis inventory if it is already known,
 
940
            otherwise None.
 
941
        :param propagate_caches: If True, the caches for this inventory are
 
942
          copied to and updated for the result if possible.
 
943
 
 
944
        :returns: (validator, new_inv)
 
945
            The validator(which is a sha1 digest, though what is sha'd is
 
946
            repository format specific) of the serialized inventory, and the
 
947
            resulting inventory.
 
948
        """
 
949
        if not self.is_in_write_group():
 
950
            raise AssertionError("%r not in write group" % (self,))
 
951
        _mod_revision.check_not_reserved_id(new_revision_id)
 
952
        basis_tree = None
 
953
        if basis_inv is None:
 
954
            if basis_revision_id == _mod_revision.NULL_REVISION:
 
955
                new_inv = self._create_inv_from_null(delta, new_revision_id)
 
956
                if new_inv.root_id is None:
 
957
                    raise errors.RootMissing()
 
958
                inv_lines = new_inv.to_lines()
 
959
                return self._inventory_add_lines(new_revision_id, parents,
 
960
                    inv_lines, check_content=False), new_inv
 
961
            else:
 
962
                basis_tree = self.revision_tree(basis_revision_id)
 
963
                basis_tree.lock_read()
 
964
                basis_inv = basis_tree.root_inventory
 
965
        try:
 
966
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
 
967
                propagate_caches=propagate_caches)
 
968
            inv_lines = result.to_lines()
 
969
            return self._inventory_add_lines(new_revision_id, parents,
 
970
                inv_lines, check_content=False), result
 
971
        finally:
 
972
            if basis_tree is not None:
 
973
                basis_tree.unlock()
 
974
 
 
975
    def _deserialise_inventory(self, revision_id, bytes):
 
976
        return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
 
977
            (revision_id,))
 
978
 
 
979
    def _iter_inventories(self, revision_ids, ordering):
 
980
        """Iterate over many inventory objects."""
 
981
        if ordering is None:
 
982
            ordering = 'unordered'
 
983
        keys = [(revision_id,) for revision_id in revision_ids]
 
984
        stream = self.inventories.get_record_stream(keys, ordering, True)
 
985
        texts = {}
 
986
        for record in stream:
 
987
            if record.storage_kind != 'absent':
 
988
                texts[record.key] = record.get_bytes_as('fulltext')
 
989
            else:
 
990
                texts[record.key] = None
 
991
        for key in keys:
 
992
            bytes = texts[key]
 
993
            if bytes is None:
 
994
                yield (None, key[-1])
 
995
            else:
 
996
                yield (inventory.CHKInventory.deserialise(
 
997
                    self.chk_bytes, bytes, key), key[-1])
 
998
 
 
999
    def _get_inventory_xml(self, revision_id):
 
1000
        """Get serialized inventory as a string."""
 
1001
        # Without a native 'xml' inventory, this method doesn't make sense.
 
1002
        # However older working trees, and older bundles want it - so we supply
 
1003
        # it allowing _get_inventory_xml to work. Bundles currently use the
 
1004
        # serializer directly; this also isn't ideal, but there isn't an xml
 
1005
        # iteration interface offered at all for repositories.
 
1006
        return self._serializer.write_inventory_to_string(
 
1007
            self.get_inventory(revision_id))
 
1008
 
 
1009
    def _find_present_inventory_keys(self, revision_keys):
 
1010
        parent_map = self.inventories.get_parent_map(revision_keys)
 
1011
        present_inventory_keys = set(k for k in parent_map)
 
1012
        return present_inventory_keys
 
1013
 
 
1014
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
 
1015
        """Find the file ids and versions affected by revisions.
 
1016
 
 
1017
        :param revisions: an iterable containing revision ids.
 
1018
        :param _inv_weave: The inventory weave from this repository or None.
 
1019
            If None, the inventory weave will be opened automatically.
 
1020
        :return: a dictionary mapping altered file-ids to an iterable of
 
1021
            revision_ids. Each altered file-ids has the exact revision_ids that
 
1022
            altered it listed explicitly.
 
1023
        """
 
1024
        rich_root = self.supports_rich_root()
 
1025
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
 
1026
        file_id_revisions = {}
 
1027
        pb = ui.ui_factory.nested_progress_bar()
 
1028
        try:
 
1029
            revision_keys = [(r,) for r in revision_ids]
 
1030
            parent_keys = self._find_parent_keys_of_revisions(revision_keys)
 
1031
            # TODO: instead of using _find_present_inventory_keys, change the
 
1032
            #       code paths to allow missing inventories to be tolerated.
 
1033
            #       However, we only want to tolerate missing parent
 
1034
            #       inventories, not missing inventories for revision_ids
 
1035
            present_parent_inv_keys = self._find_present_inventory_keys(
 
1036
                                        parent_keys)
 
1037
            present_parent_inv_ids = {k[-1] for k in present_parent_inv_keys}
 
1038
            inventories_to_read = set(revision_ids)
 
1039
            inventories_to_read.update(present_parent_inv_ids)
 
1040
            root_key_info = _build_interesting_key_sets(
 
1041
                self, inventories_to_read, present_parent_inv_ids)
 
1042
            interesting_root_keys = root_key_info.interesting_root_keys
 
1043
            uninteresting_root_keys = root_key_info.uninteresting_root_keys
 
1044
            chk_bytes = self.chk_bytes
 
1045
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1046
                        interesting_root_keys, uninteresting_root_keys,
 
1047
                        pb=pb):
 
1048
                for name, bytes in items:
 
1049
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
 
1050
                    # TODO: consider interning file_id, revision_id here, or
 
1051
                    #       pushing that intern() into bytes_to_info()
 
1052
                    # TODO: rich_root should always be True here, for all
 
1053
                    #       repositories that support chk_bytes
 
1054
                    if not rich_root and name_utf8 == '':
 
1055
                        continue
 
1056
                    try:
 
1057
                        file_id_revisions[file_id].add(revision_id)
 
1058
                    except KeyError:
 
1059
                        file_id_revisions[file_id] = {revision_id}
 
1060
        finally:
 
1061
            pb.finished()
 
1062
        return file_id_revisions
 
1063
 
 
1064
    def find_text_key_references(self):
 
1065
        """Find the text key references within the repository.
 
1066
 
 
1067
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
 
1068
            to whether they were referred to by the inventory of the
 
1069
            revision_id that they contain. The inventory texts from all present
 
1070
            revision ids are assessed to generate this report.
 
1071
        """
 
1072
        # XXX: Slow version but correct: rewrite as a series of delta
 
1073
        # examinations/direct tree traversal. Note that that will require care
 
1074
        # as a common node is reachable both from the inventory that added it,
 
1075
        # and others afterwards.
 
1076
        revision_keys = self.revisions.keys()
 
1077
        result = {}
 
1078
        rich_roots = self.supports_rich_root()
 
1079
        pb = ui.ui_factory.nested_progress_bar()
 
1080
        try:
 
1081
            all_revs = self.all_revision_ids()
 
1082
            total = len(all_revs)
 
1083
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
 
1084
                pb.update("Finding text references", pos, total)
 
1085
                for _, entry in inv.iter_entries():
 
1086
                    if not rich_roots and entry.file_id == inv.root_id:
 
1087
                        continue
 
1088
                    key = (entry.file_id, entry.revision)
 
1089
                    result.setdefault(key, False)
 
1090
                    if entry.revision == inv.revision_id:
 
1091
                        result[key] = True
 
1092
            return result
 
1093
        finally:
 
1094
            pb.finished()
 
1095
 
 
1096
    @needs_write_lock
 
1097
    def reconcile_canonicalize_chks(self):
 
1098
        """Reconcile this repository to make sure all CHKs are in canonical
 
1099
        form.
 
1100
        """
 
1101
        from breezy.reconcile import PackReconciler
 
1102
        reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
 
1103
        reconciler.reconcile()
 
1104
        return reconciler
 
1105
 
 
1106
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
1107
        packer = GCCHKReconcilePacker(collection, packs, extension)
 
1108
        return packer.pack(pb)
 
1109
 
 
1110
    def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
 
1111
        packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
 
1112
        return packer.pack(pb)
 
1113
 
 
1114
    def _get_source(self, to_format):
 
1115
        """Return a source for streaming from this repository."""
 
1116
        if self._format._serializer == to_format._serializer:
 
1117
            # We must be exactly the same format, otherwise stuff like the chk
 
1118
            # page layout might be different.
 
1119
            # Actually, this test is just slightly looser than exact so that
 
1120
            # CHK2 <-> 2a transfers will work.
 
1121
            return GroupCHKStreamSource(self, to_format)
 
1122
        return super(CHKInventoryRepository, self)._get_source(to_format)
 
1123
 
 
1124
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
 
1125
        """Find revisions with different parent lists in the revision object
 
1126
        and in the index graph.
 
1127
 
 
1128
        :param revisions_iterator: None, or an iterator of (revid,
 
1129
            Revision-or-None). This iterator controls the revisions checked.
 
1130
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
 
1131
            parents-in-revision).
 
1132
        """
 
1133
        if not self.is_locked():
 
1134
            raise AssertionError()
 
1135
        vf = self.revisions
 
1136
        if revisions_iterator is None:
 
1137
            revisions_iterator = self._iter_revisions(None)
 
1138
        for revid, revision in revisions_iterator:
 
1139
            if revision is None:
 
1140
                pass
 
1141
            parent_map = vf.get_parent_map([(revid,)])
 
1142
            parents_according_to_index = tuple(parent[-1] for parent in
 
1143
                parent_map[(revid,)])
 
1144
            parents_according_to_revision = tuple(revision.parent_ids)
 
1145
            if parents_according_to_index != parents_according_to_revision:
 
1146
                yield (revid, parents_according_to_index,
 
1147
                    parents_according_to_revision)
 
1148
 
 
1149
    def _check_for_inconsistent_revision_parents(self):
 
1150
        inconsistencies = list(self._find_inconsistent_revision_parents())
 
1151
        if inconsistencies:
 
1152
            raise errors.BzrCheckError(
 
1153
                "Revision index has inconsistent parents.")
 
1154
 
 
1155
 
 
1156
class GroupCHKStreamSource(StreamSource):
 
1157
    """Used when both the source and target repo are GroupCHK repos."""
 
1158
 
 
1159
    def __init__(self, from_repository, to_format):
 
1160
        """Create a StreamSource streaming from from_repository."""
 
1161
        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
 
1162
        self._revision_keys = None
 
1163
        self._text_keys = None
 
1164
        self._text_fetch_order = 'groupcompress'
 
1165
        self._chk_id_roots = None
 
1166
        self._chk_p_id_roots = None
 
1167
 
 
1168
    def _get_inventory_stream(self, inventory_keys, allow_absent=False):
 
1169
        """Get a stream of inventory texts.
 
1170
 
 
1171
        When this function returns, self._chk_id_roots and self._chk_p_id_roots
 
1172
        should be populated.
 
1173
        """
 
1174
        self._chk_id_roots = []
 
1175
        self._chk_p_id_roots = []
 
1176
        def _filtered_inv_stream():
 
1177
            id_roots_set = set()
 
1178
            p_id_roots_set = set()
 
1179
            source_vf = self.from_repository.inventories
 
1180
            stream = source_vf.get_record_stream(inventory_keys,
 
1181
                                                 'groupcompress', True)
 
1182
            for record in stream:
 
1183
                if record.storage_kind == 'absent':
 
1184
                    if allow_absent:
 
1185
                        continue
 
1186
                    else:
 
1187
                        raise errors.NoSuchRevision(self, record.key)
 
1188
                bytes = record.get_bytes_as('fulltext')
 
1189
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
1190
                                                             record.key)
 
1191
                key = chk_inv.id_to_entry.key()
 
1192
                if key not in id_roots_set:
 
1193
                    self._chk_id_roots.append(key)
 
1194
                    id_roots_set.add(key)
 
1195
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
1196
                if p_id_map is None:
 
1197
                    raise AssertionError('Parent id -> file_id map not set')
 
1198
                key = p_id_map.key()
 
1199
                if key not in p_id_roots_set:
 
1200
                    p_id_roots_set.add(key)
 
1201
                    self._chk_p_id_roots.append(key)
 
1202
                yield record
 
1203
            # We have finished processing all of the inventory records, we
 
1204
            # don't need these sets anymore
 
1205
            id_roots_set.clear()
 
1206
            p_id_roots_set.clear()
 
1207
        return ('inventories', _filtered_inv_stream())
 
1208
 
 
1209
    def _get_filtered_chk_streams(self, excluded_revision_keys):
 
1210
        self._text_keys = set()
 
1211
        excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
 
1212
        if not excluded_revision_keys:
 
1213
            uninteresting_root_keys = set()
 
1214
            uninteresting_pid_root_keys = set()
 
1215
        else:
 
1216
            # filter out any excluded revisions whose inventories are not
 
1217
            # actually present
 
1218
            # TODO: Update Repository.iter_inventories() to add
 
1219
            #       ignore_missing=True
 
1220
            present_keys = self.from_repository._find_present_inventory_keys(
 
1221
                            excluded_revision_keys)
 
1222
            present_ids = [k[-1] for k in present_keys]
 
1223
            uninteresting_root_keys = set()
 
1224
            uninteresting_pid_root_keys = set()
 
1225
            for inv in self.from_repository.iter_inventories(present_ids):
 
1226
                uninteresting_root_keys.add(inv.id_to_entry.key())
 
1227
                uninteresting_pid_root_keys.add(
 
1228
                    inv.parent_id_basename_to_file_id.key())
 
1229
        chk_bytes = self.from_repository.chk_bytes
 
1230
        def _filter_id_to_entry():
 
1231
            interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
 
1232
                        self._chk_id_roots, uninteresting_root_keys)
 
1233
            for record in _filter_text_keys(interesting_nodes, self._text_keys,
 
1234
                    chk_map._bytes_to_text_key):
 
1235
                if record is not None:
 
1236
                    yield record
 
1237
            # Consumed
 
1238
            self._chk_id_roots = None
 
1239
        yield 'chk_bytes', _filter_id_to_entry()
 
1240
        def _get_parent_id_basename_to_file_id_pages():
 
1241
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
 
1242
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
 
1243
                if record is not None:
 
1244
                    yield record
 
1245
            # Consumed
 
1246
            self._chk_p_id_roots = None
 
1247
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
 
1248
 
 
1249
    def _get_text_stream(self):
 
1250
        # Note: We know we don't have to handle adding root keys, because both
 
1251
        # the source and target are the identical network name.
 
1252
        text_stream = self.from_repository.texts.get_record_stream(
 
1253
                        self._text_keys, self._text_fetch_order, False)
 
1254
        return ('texts', text_stream)
 
1255
 
 
1256
    def get_stream(self, search):
 
1257
        def wrap_and_count(pb, rc, stream):
 
1258
            """Yield records from stream while showing progress."""
 
1259
            count = 0
 
1260
            for record in stream:
 
1261
                if count == rc.STEP:
 
1262
                    rc.increment(count)
 
1263
                    pb.update('Estimate', rc.current, rc.max)
 
1264
                    count = 0
 
1265
                count += 1
 
1266
                yield record
 
1267
 
 
1268
        revision_ids = search.get_keys()
 
1269
        pb = ui.ui_factory.nested_progress_bar()
 
1270
        rc = self._record_counter
 
1271
        self._record_counter.setup(len(revision_ids))
 
1272
        for stream_info in self._fetch_revision_texts(revision_ids):
 
1273
            yield (stream_info[0],
 
1274
                wrap_and_count(pb, rc, stream_info[1]))
 
1275
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1276
        # TODO: The keys to exclude might be part of the search recipe
 
1277
        # For now, exclude all parents that are at the edge of ancestry, for
 
1278
        # which we have inventories
 
1279
        from_repo = self.from_repository
 
1280
        parent_keys = from_repo._find_parent_keys_of_revisions(
 
1281
                        self._revision_keys)
 
1282
        self.from_repository.revisions.clear_cache()
 
1283
        self.from_repository.signatures.clear_cache()
 
1284
        # Clear the repo's get_parent_map cache too.
 
1285
        self.from_repository._unstacked_provider.disable_cache()
 
1286
        self.from_repository._unstacked_provider.enable_cache()
 
1287
        s = self._get_inventory_stream(self._revision_keys)
 
1288
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1289
        self.from_repository.inventories.clear_cache()
 
1290
        for stream_info in self._get_filtered_chk_streams(parent_keys):
 
1291
            yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
 
1292
        self.from_repository.chk_bytes.clear_cache()
 
1293
        s = self._get_text_stream()
 
1294
        yield (s[0], wrap_and_count(pb, rc, s[1]))
 
1295
        self.from_repository.texts.clear_cache()
 
1296
        pb.update('Done', rc.max, rc.max)
 
1297
        pb.finished()
 
1298
 
 
1299
    def get_stream_for_missing_keys(self, missing_keys):
 
1300
        # missing keys can only occur when we are byte copying and not
 
1301
        # translating (because translation means we don't send
 
1302
        # unreconstructable deltas ever).
 
1303
        missing_inventory_keys = set()
 
1304
        for key in missing_keys:
 
1305
            if key[0] != 'inventories':
 
1306
                raise AssertionError('The only missing keys we should'
 
1307
                    ' be filling in are inventory keys, not %s'
 
1308
                    % (key[0],))
 
1309
            missing_inventory_keys.add(key[1:])
 
1310
        if self._chk_id_roots or self._chk_p_id_roots:
 
1311
            raise AssertionError('Cannot call get_stream_for_missing_keys'
 
1312
                ' until all of get_stream() has been consumed.')
 
1313
        # Yield the inventory stream, so we can find the chk stream
 
1314
        # Some of the missing_keys will be missing because they are ghosts.
 
1315
        # As such, we can ignore them. The Sink is required to verify there are
 
1316
        # no unavailable texts when the ghost inventories are not filled in.
 
1317
        yield self._get_inventory_stream(missing_inventory_keys,
 
1318
                                         allow_absent=True)
 
1319
        # We use the empty set for excluded_revision_keys, to make it clear
 
1320
        # that we want to transmit all referenced chk pages.
 
1321
        for stream_info in self._get_filtered_chk_streams(set()):
 
1322
            yield stream_info
 
1323
 
 
1324
 
 
1325
class _InterestingKeyInfo(object):
 
1326
    def __init__(self):
 
1327
        self.interesting_root_keys = set()
 
1328
        self.interesting_pid_root_keys = set()
 
1329
        self.uninteresting_root_keys = set()
 
1330
        self.uninteresting_pid_root_keys = set()
 
1331
 
 
1332
    def all_interesting(self):
 
1333
        return self.interesting_root_keys.union(self.interesting_pid_root_keys)
 
1334
 
 
1335
    def all_uninteresting(self):
 
1336
        return self.uninteresting_root_keys.union(
 
1337
            self.uninteresting_pid_root_keys)
 
1338
 
 
1339
    def all_keys(self):
 
1340
        return self.all_interesting().union(self.all_uninteresting())
 
1341
 
 
1342
 
 
1343
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
 
1344
    result = _InterestingKeyInfo()
 
1345
    for inv in repo.iter_inventories(inventory_ids, 'unordered'):
 
1346
        root_key = inv.id_to_entry.key()
 
1347
        pid_root_key = inv.parent_id_basename_to_file_id.key()
 
1348
        if inv.revision_id in parent_only_inv_ids:
 
1349
            result.uninteresting_root_keys.add(root_key)
 
1350
            result.uninteresting_pid_root_keys.add(pid_root_key)
 
1351
        else:
 
1352
            result.interesting_root_keys.add(root_key)
 
1353
            result.interesting_pid_root_keys.add(pid_root_key)
 
1354
    return result
 
1355
 
 
1356
 
 
1357
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
 
1358
    """Iterate the result of iter_interesting_nodes, yielding the records
 
1359
    and adding to text_keys.
 
1360
    """
 
1361
    text_keys_update = text_keys.update
 
1362
    for record, items in interesting_nodes_iterable:
 
1363
        text_keys_update([bytes_to_text_key(b) for n,b in items])
 
1364
        yield record
 
1365
 
 
1366
 
 
1367
class RepositoryFormat2a(RepositoryFormatPack):
 
1368
    """A CHK repository that uses the bencode revision serializer."""
 
1369
 
 
1370
    repository_class = CHKInventoryRepository
 
1371
    supports_external_lookups = True
 
1372
    supports_chks = True
 
1373
    _commit_builder_class = PackRootCommitBuilder
 
1374
    rich_root_data = True
 
1375
    _serializer = chk_serializer.chk_bencode_serializer
 
1376
    _commit_inv_deltas = True
 
1377
    # What index classes to use
 
1378
    index_builder_class = BTreeBuilder
 
1379
    index_class = BTreeGraphIndex
 
1380
    # Note: We cannot unpack a delta that references a text we haven't
 
1381
    # seen yet. There are 2 options, work in fulltexts, or require
 
1382
    # topological sorting. Using fulltexts is more optimal for local
 
1383
    # operations, because the source can be smart about extracting
 
1384
    # multiple in-a-row (and sharing strings). Topological is better
 
1385
    # for remote, because we access less data.
 
1386
    _fetch_order = 'unordered'
 
1387
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
 
1388
    fast_deltas = True
 
1389
    pack_compresses = True
 
1390
 
 
1391
    def _get_matching_bzrdir(self):
 
1392
        return controldir.format_registry.make_bzrdir('2a')
 
1393
 
 
1394
    def _ignore_setting_bzrdir(self, format):
 
1395
        pass
 
1396
 
 
1397
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1398
 
 
1399
    @classmethod
 
1400
    def get_format_string(cls):
 
1401
        return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
 
1402
 
 
1403
    def get_format_description(self):
 
1404
        """See RepositoryFormat.get_format_description()."""
 
1405
        return ("Repository format 2a - rich roots, group compression"
 
1406
            " and chk inventories")
 
1407
 
 
1408
 
 
1409
class RepositoryFormat2aSubtree(RepositoryFormat2a):
 
1410
    """A 2a repository format that supports nested trees.
 
1411
 
 
1412
    """
 
1413
 
 
1414
    def _get_matching_bzrdir(self):
 
1415
        return controldir.format_registry.make_bzrdir('development-subtree')
 
1416
 
 
1417
    def _ignore_setting_bzrdir(self, format):
 
1418
        pass
 
1419
 
 
1420
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
1421
 
 
1422
    @classmethod
 
1423
    def get_format_string(cls):
 
1424
        return ('Bazaar development format 8\n')
 
1425
 
 
1426
    def get_format_description(self):
 
1427
        """See RepositoryFormat.get_format_description()."""
 
1428
        return ("Development repository format 8 - nested trees, "
 
1429
                "group compression and chk inventories")
 
1430
 
 
1431
    experimental = True
 
1432
    supports_tree_reference = True