/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/chk_map.py

  • Committer: Robert Collins
  • Date: 2005-10-19 10:11:57 UTC
  • mfrom: (1185.16.78)
  • mto: This revision was merged to the branch mainline in revision 1470.
  • Revision ID: robertc@robertcollins.net-20051019101157-17438d311e746b4f
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Persistent maps from tuple_of_strings->string using CHK stores.
18
 
 
19
 
Overview and current status:
20
 
 
21
 
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
 
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
 
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
 
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
 
are all an additional 8-bits wide leading to a sparse upper tree.
26
 
 
27
 
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
 
allow optimisation of the update operation; but individual map/unmap calls are
29
 
possible and supported. Individual changes via map/unmap are buffered in memory
30
 
until the _save method is called to force serialisation of the tree.
31
 
apply_delta records its changes immediately by performing an implicit _save.
32
 
 
33
 
TODO:
34
 
-----
35
 
 
36
 
Densely packed upper nodes.
37
 
 
38
 
"""
39
 
 
40
 
from __future__ import absolute_import
41
 
 
42
 
import heapq
43
 
import threading
44
 
 
45
 
from .. import (
46
 
    errors,
47
 
    lru_cache,
48
 
    osutils,
49
 
    registry,
50
 
    static_tuple,
51
 
    trace,
52
 
    )
53
 
from ..sixish import (
54
 
    viewitems,
55
 
    viewvalues,
56
 
    )
57
 
from ..sixish import PY3
58
 
from ..static_tuple import StaticTuple
59
 
 
60
 
# approx 4MB
61
 
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
62
 
# out, it takes 3.1MB to cache the layer.
63
 
_PAGE_CACHE_SIZE = 4 * 1024 * 1024
64
 
# Per thread caches for 2 reasons:
65
 
# - in the server we may be serving very different content, so we get less
66
 
#   cache thrashing.
67
 
# - we avoid locking on every cache lookup.
68
 
_thread_caches = threading.local()
69
 
# The page cache.
70
 
_thread_caches.page_cache = None
71
 
 
72
 
 
73
 
def _get_cache():
74
 
    """Get the per-thread page cache.
75
 
 
76
 
    We need a function to do this because in a new thread the _thread_caches
77
 
    threading.local object does not have the cache initialized yet.
78
 
    """
79
 
    page_cache = getattr(_thread_caches, 'page_cache', None)
80
 
    if page_cache is None:
81
 
        # We are caching bytes so len(value) is perfectly accurate
82
 
        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
83
 
        _thread_caches.page_cache = page_cache
84
 
    return page_cache
85
 
 
86
 
 
87
 
def clear_cache():
88
 
    _get_cache().clear()
89
 
 
90
 
 
91
 
# If a ChildNode falls below this many bytes, we check for a remap
92
 
_INTERESTING_NEW_SIZE = 50
93
 
# If a ChildNode shrinks by more than this amount, we check for a remap
94
 
_INTERESTING_SHRINKAGE_LIMIT = 20
95
 
 
96
 
 
97
 
def _search_key_plain(key):
98
 
    """Map the key tuple into a search string that just uses the key bytes."""
99
 
    return b'\x00'.join(key)
100
 
 
101
 
 
102
 
search_key_registry = registry.Registry()
103
 
search_key_registry.register(b'plain', _search_key_plain)
104
 
 
105
 
 
106
 
class CHKMap(object):
107
 
    """A persistent map from string to string backed by a CHK store."""
108
 
 
109
 
    __slots__ = ('_store', '_root_node', '_search_key_func')
110
 
 
111
 
    def __init__(self, store, root_key, search_key_func=None):
112
 
        """Create a CHKMap object.
113
 
 
114
 
        :param store: The store the CHKMap is stored in.
115
 
        :param root_key: The root key of the map. None to create an empty
116
 
            CHKMap.
117
 
        :param search_key_func: A function mapping a key => bytes. These bytes
118
 
            are then used by the internal nodes to split up leaf nodes into
119
 
            multiple pages.
120
 
        """
121
 
        self._store = store
122
 
        if search_key_func is None:
123
 
            search_key_func = _search_key_plain
124
 
        self._search_key_func = search_key_func
125
 
        if root_key is None:
126
 
            self._root_node = LeafNode(search_key_func=search_key_func)
127
 
        else:
128
 
            self._root_node = self._node_key(root_key)
129
 
 
130
 
    def apply_delta(self, delta):
131
 
        """Apply a delta to the map.
132
 
 
133
 
        :param delta: An iterable of old_key, new_key, new_value tuples.
134
 
            If new_key is not None, then new_key->new_value is inserted
135
 
            into the map; if old_key is not None, then the old mapping
136
 
            of old_key is removed.
137
 
        """
138
 
        has_deletes = False
139
 
        # Check preconditions first.
140
 
        as_st = StaticTuple.from_sequence
141
 
        new_items = {as_st(key) for (old, key, value) in delta
142
 
                     if key is not None and old is None}
143
 
        existing_new = list(self.iteritems(key_filter=new_items))
144
 
        if existing_new:
145
 
            raise errors.InconsistentDeltaDelta(delta,
146
 
                                                "New items are already in the map %r." % existing_new)
147
 
        # Now apply changes.
148
 
        for old, new, value in delta:
149
 
            if old is not None and old != new:
150
 
                self.unmap(old, check_remap=False)
151
 
                has_deletes = True
152
 
        for old, new, value in delta:
153
 
            if new is not None:
154
 
                self.map(new, value)
155
 
        if has_deletes:
156
 
            self._check_remap()
157
 
        return self._save()
158
 
 
159
 
    def _ensure_root(self):
160
 
        """Ensure that the root node is an object not a key."""
161
 
        if isinstance(self._root_node, StaticTuple):
162
 
            # Demand-load the root
163
 
            self._root_node = self._get_node(self._root_node)
164
 
 
165
 
    def _get_node(self, node):
166
 
        """Get a node.
167
 
 
168
 
        Note that this does not update the _items dict in objects containing a
169
 
        reference to this node. As such it does not prevent subsequent IO being
170
 
        performed.
171
 
 
172
 
        :param node: A tuple key or node object.
173
 
        :return: A node object.
174
 
        """
175
 
        if isinstance(node, StaticTuple):
176
 
            bytes = self._read_bytes(node)
177
 
            return _deserialise(bytes, node,
178
 
                                search_key_func=self._search_key_func)
179
 
        else:
180
 
            return node
181
 
 
182
 
    def _read_bytes(self, key):
183
 
        try:
184
 
            return _get_cache()[key]
185
 
        except KeyError:
186
 
            stream = self._store.get_record_stream([key], 'unordered', True)
187
 
            bytes = next(stream).get_bytes_as('fulltext')
188
 
            _get_cache()[key] = bytes
189
 
            return bytes
190
 
 
191
 
    def _dump_tree(self, include_keys=False, encoding='utf-8'):
192
 
        """Return the tree in a string representation."""
193
 
        self._ensure_root()
194
 
        if PY3:
195
 
            def decode(x): return x.decode(encoding)
196
 
        else:
197
 
            def decode(x): return x
198
 
        res = self._dump_tree_node(self._root_node, prefix=b'', indent='',
199
 
                                   decode=decode, include_keys=include_keys)
200
 
        res.append('')  # Give a trailing '\n'
201
 
        return '\n'.join(res)
202
 
 
203
 
    def _dump_tree_node(self, node, prefix, indent, decode, include_keys=True):
204
 
        """For this node and all children, generate a string representation."""
205
 
        result = []
206
 
        if not include_keys:
207
 
            key_str = ''
208
 
        else:
209
 
            node_key = node.key()
210
 
            if node_key is not None:
211
 
                key_str = ' %s' % (decode(node_key[0]),)
212
 
            else:
213
 
                key_str = ' None'
214
 
        result.append('%s%r %s%s' % (indent, decode(prefix), node.__class__.__name__,
215
 
                                     key_str))
216
 
        if isinstance(node, InternalNode):
217
 
            # Trigger all child nodes to get loaded
218
 
            list(node._iter_nodes(self._store))
219
 
            for prefix, sub in sorted(viewitems(node._items)):
220
 
                result.extend(self._dump_tree_node(sub, prefix, indent + '  ',
221
 
                                                   decode=decode, include_keys=include_keys))
222
 
        else:
223
 
            for key, value in sorted(viewitems(node._items)):
224
 
                # Don't use prefix nor indent here to line up when used in
225
 
                # tests in conjunction with assertEqualDiff
226
 
                result.append('      %r %r' % (
227
 
                    tuple([decode(ke) for ke in key]), decode(value)))
228
 
        return result
229
 
 
230
 
    @classmethod
231
 
    def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
232
 
                  search_key_func=None):
233
 
        """Create a CHKMap in store with initial_value as the content.
234
 
 
235
 
        :param store: The store to record initial_value in, a VersionedFiles
236
 
            object with 1-tuple keys supporting CHK key generation.
237
 
        :param initial_value: A dict to store in store. Its keys and values
238
 
            must be bytestrings.
239
 
        :param maximum_size: The maximum_size rule to apply to nodes. This
240
 
            determines the size at which no new data is added to a single node.
241
 
        :param key_width: The number of elements in each key_tuple being stored
242
 
            in this map.
243
 
        :param search_key_func: A function mapping a key => bytes. These bytes
244
 
            are then used by the internal nodes to split up leaf nodes into
245
 
            multiple pages.
246
 
        :return: The root chk of the resulting CHKMap.
247
 
        """
248
 
        root_key = klass._create_directly(store, initial_value,
249
 
                                          maximum_size=maximum_size, key_width=key_width,
250
 
                                          search_key_func=search_key_func)
251
 
        if not isinstance(root_key, StaticTuple):
252
 
            raise AssertionError('we got a %s instead of a StaticTuple'
253
 
                                 % (type(root_key),))
254
 
        return root_key
255
 
 
256
 
    @classmethod
257
 
    def _create_via_map(klass, store, initial_value, maximum_size=0,
258
 
                        key_width=1, search_key_func=None):
259
 
        result = klass(store, None, search_key_func=search_key_func)
260
 
        result._root_node.set_maximum_size(maximum_size)
261
 
        result._root_node._key_width = key_width
262
 
        delta = []
263
 
        for key, value in viewitems(initial_value):
264
 
            delta.append((None, key, value))
265
 
        root_key = result.apply_delta(delta)
266
 
        return root_key
267
 
 
268
 
    @classmethod
269
 
    def _create_directly(klass, store, initial_value, maximum_size=0,
270
 
                         key_width=1, search_key_func=None):
271
 
        node = LeafNode(search_key_func=search_key_func)
272
 
        node.set_maximum_size(maximum_size)
273
 
        node._key_width = key_width
274
 
        as_st = StaticTuple.from_sequence
275
 
        node._items = dict((as_st(key), val)
276
 
                           for key, val in viewitems(initial_value))
277
 
        node._raw_size = sum(node._key_value_len(key, value)
278
 
                             for key, value in viewitems(node._items))
279
 
        node._len = len(node._items)
280
 
        node._compute_search_prefix()
281
 
        node._compute_serialised_prefix()
282
 
        if (node._len > 1 and
283
 
            maximum_size and
284
 
                node._current_size() > maximum_size):
285
 
            prefix, node_details = node._split(store)
286
 
            if len(node_details) == 1:
287
 
                raise AssertionError('Failed to split using node._split')
288
 
            node = InternalNode(prefix, search_key_func=search_key_func)
289
 
            node.set_maximum_size(maximum_size)
290
 
            node._key_width = key_width
291
 
            for split, subnode in node_details:
292
 
                node.add_node(split, subnode)
293
 
        keys = list(node.serialise(store))
294
 
        return keys[-1]
295
 
 
296
 
    def iter_changes(self, basis):
297
 
        """Iterate over the changes between basis and self.
298
 
 
299
 
        :return: An iterator of tuples: (key, old_value, new_value). Old_value
300
 
            is None for keys only in self; new_value is None for keys only in
301
 
            basis.
302
 
        """
303
 
        # Overview:
304
 
        # Read both trees in lexographic, highest-first order.
305
 
        # Any identical nodes we skip
306
 
        # Any unique prefixes we output immediately.
307
 
        # values in a leaf node are treated as single-value nodes in the tree
308
 
        # which allows them to be not-special-cased. We know to output them
309
 
        # because their value is a string, not a key(tuple) or node.
310
 
        #
311
 
        # corner cases to beware of when considering this function:
312
 
        # *) common references are at different heights.
313
 
        #    consider two trees:
314
 
        #    {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
315
 
        #    {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
316
 
        #                        'ab':LeafNode={'ab':'bar'}}
317
 
        #     'b': LeafNode={'b'}}
318
 
        #    the node with aaa/aab will only be encountered in the second tree
319
 
        #    after reading the 'a' subtree, but it is encountered in the first
320
 
        #    tree immediately. Variations on this may have read internal nodes
321
 
        #    like this.  we want to cut the entire pending subtree when we
322
 
        #    realise we have a common node.  For this we use a list of keys -
323
 
        #    the path to a node - and check the entire path is clean as we
324
 
        #    process each item.
325
 
        if self._node_key(self._root_node) == self._node_key(basis._root_node):
326
 
            return
327
 
        self._ensure_root()
328
 
        basis._ensure_root()
329
 
        excluded_keys = set()
330
 
        self_node = self._root_node
331
 
        basis_node = basis._root_node
332
 
        # A heap, each element is prefix, node(tuple/NodeObject/string),
333
 
        # key_path (a list of tuples, tail-sharing down the tree.)
334
 
        self_pending = []
335
 
        basis_pending = []
336
 
 
337
 
        def process_node(node, path, a_map, pending):
338
 
            # take a node and expand it
339
 
            node = a_map._get_node(node)
340
 
            if isinstance(node, LeafNode):
341
 
                path = (node._key, path)
342
 
                for key, value in viewitems(node._items):
343
 
                    # For a LeafNode, the key is a serialized_key, rather than
344
 
                    # a search_key, but the heap is using search_keys
345
 
                    search_key = node._search_key_func(key)
346
 
                    heapq.heappush(pending, (search_key, key, value, path))
347
 
            else:
348
 
                # type(node) == InternalNode
349
 
                path = (node._key, path)
350
 
                for prefix, child in viewitems(node._items):
351
 
                    heapq.heappush(pending, (prefix, None, child, path))
352
 
 
353
 
        def process_common_internal_nodes(self_node, basis_node):
354
 
            self_items = set(viewitems(self_node._items))
355
 
            basis_items = set(viewitems(basis_node._items))
356
 
            path = (self_node._key, None)
357
 
            for prefix, child in self_items - basis_items:
358
 
                heapq.heappush(self_pending, (prefix, None, child, path))
359
 
            path = (basis_node._key, None)
360
 
            for prefix, child in basis_items - self_items:
361
 
                heapq.heappush(basis_pending, (prefix, None, child, path))
362
 
 
363
 
        def process_common_leaf_nodes(self_node, basis_node):
364
 
            self_items = set(viewitems(self_node._items))
365
 
            basis_items = set(viewitems(basis_node._items))
366
 
            path = (self_node._key, None)
367
 
            for key, value in self_items - basis_items:
368
 
                prefix = self._search_key_func(key)
369
 
                heapq.heappush(self_pending, (prefix, key, value, path))
370
 
            path = (basis_node._key, None)
371
 
            for key, value in basis_items - self_items:
372
 
                prefix = basis._search_key_func(key)
373
 
                heapq.heappush(basis_pending, (prefix, key, value, path))
374
 
 
375
 
        def process_common_prefix_nodes(self_node, self_path,
376
 
                                        basis_node, basis_path):
377
 
            # Would it be more efficient if we could request both at the same
378
 
            # time?
379
 
            self_node = self._get_node(self_node)
380
 
            basis_node = basis._get_node(basis_node)
381
 
            if (isinstance(self_node, InternalNode) and
382
 
                    isinstance(basis_node, InternalNode)):
383
 
                # Matching internal nodes
384
 
                process_common_internal_nodes(self_node, basis_node)
385
 
            elif (isinstance(self_node, LeafNode) and
386
 
                  isinstance(basis_node, LeafNode)):
387
 
                process_common_leaf_nodes(self_node, basis_node)
388
 
            else:
389
 
                process_node(self_node, self_path, self, self_pending)
390
 
                process_node(basis_node, basis_path, basis, basis_pending)
391
 
        process_common_prefix_nodes(self_node, None, basis_node, None)
392
 
        self_seen = set()
393
 
        basis_seen = set()
394
 
        excluded_keys = set()
395
 
 
396
 
        def check_excluded(key_path):
397
 
            # Note that this is N^2, it depends on us trimming trees
398
 
            # aggressively to not become slow.
399
 
            # A better implementation would probably have a reverse map
400
 
            # back to the children of a node, and jump straight to it when
401
 
            # a common node is detected, the proceed to remove the already
402
 
            # pending children. breezy.graph has a searcher module with a
403
 
            # similar problem.
404
 
            while key_path is not None:
405
 
                key, key_path = key_path
406
 
                if key in excluded_keys:
407
 
                    return True
408
 
            return False
409
 
 
410
 
        loop_counter = 0
411
 
        while self_pending or basis_pending:
412
 
            loop_counter += 1
413
 
            if not self_pending:
414
 
                # self is exhausted: output remainder of basis
415
 
                for prefix, key, node, path in basis_pending:
416
 
                    if check_excluded(path):
417
 
                        continue
418
 
                    node = basis._get_node(node)
419
 
                    if key is not None:
420
 
                        # a value
421
 
                        yield (key, node, None)
422
 
                    else:
423
 
                        # subtree - fastpath the entire thing.
424
 
                        for key, value in node.iteritems(basis._store):
425
 
                            yield (key, value, None)
426
 
                return
427
 
            elif not basis_pending:
428
 
                # basis is exhausted: output remainder of self.
429
 
                for prefix, key, node, path in self_pending:
430
 
                    if check_excluded(path):
431
 
                        continue
432
 
                    node = self._get_node(node)
433
 
                    if key is not None:
434
 
                        # a value
435
 
                        yield (key, None, node)
436
 
                    else:
437
 
                        # subtree - fastpath the entire thing.
438
 
                        for key, value in node.iteritems(self._store):
439
 
                            yield (key, None, value)
440
 
                return
441
 
            else:
442
 
                # XXX: future optimisation - yield the smaller items
443
 
                # immediately rather than pushing everything on/off the
444
 
                # heaps. Applies to both internal nodes and leafnodes.
445
 
                if self_pending[0][0] < basis_pending[0][0]:
446
 
                    # expand self
447
 
                    prefix, key, node, path = heapq.heappop(self_pending)
448
 
                    if check_excluded(path):
449
 
                        continue
450
 
                    if key is not None:
451
 
                        # a value
452
 
                        yield (key, None, node)
453
 
                    else:
454
 
                        process_node(node, path, self, self_pending)
455
 
                        continue
456
 
                elif self_pending[0][0] > basis_pending[0][0]:
457
 
                    # expand basis
458
 
                    prefix, key, node, path = heapq.heappop(basis_pending)
459
 
                    if check_excluded(path):
460
 
                        continue
461
 
                    if key is not None:
462
 
                        # a value
463
 
                        yield (key, node, None)
464
 
                    else:
465
 
                        process_node(node, path, basis, basis_pending)
466
 
                        continue
467
 
                else:
468
 
                    # common prefix: possibly expand both
469
 
                    if self_pending[0][1] is None:
470
 
                        # process next self
471
 
                        read_self = True
472
 
                    else:
473
 
                        read_self = False
474
 
                    if basis_pending[0][1] is None:
475
 
                        # process next basis
476
 
                        read_basis = True
477
 
                    else:
478
 
                        read_basis = False
479
 
                    if not read_self and not read_basis:
480
 
                        # compare a common value
481
 
                        self_details = heapq.heappop(self_pending)
482
 
                        basis_details = heapq.heappop(basis_pending)
483
 
                        if self_details[2] != basis_details[2]:
484
 
                            yield (self_details[1],
485
 
                                   basis_details[2], self_details[2])
486
 
                        continue
487
 
                    # At least one side wasn't a simple value
488
 
                    if (self._node_key(self_pending[0][2])
489
 
                            == self._node_key(basis_pending[0][2])):
490
 
                        # Identical pointers, skip (and don't bother adding to
491
 
                        # excluded, it won't turn up again.
492
 
                        heapq.heappop(self_pending)
493
 
                        heapq.heappop(basis_pending)
494
 
                        continue
495
 
                    # Now we need to expand this node before we can continue
496
 
                    if read_self and read_basis:
497
 
                        # Both sides start with the same prefix, so process
498
 
                        # them in parallel
499
 
                        self_prefix, _, self_node, self_path = heapq.heappop(
500
 
                            self_pending)
501
 
                        basis_prefix, _, basis_node, basis_path = heapq.heappop(
502
 
                            basis_pending)
503
 
                        if self_prefix != basis_prefix:
504
 
                            raise AssertionError(
505
 
                                '%r != %r' % (self_prefix, basis_prefix))
506
 
                        process_common_prefix_nodes(
507
 
                            self_node, self_path,
508
 
                            basis_node, basis_path)
509
 
                        continue
510
 
                    if read_self:
511
 
                        prefix, key, node, path = heapq.heappop(self_pending)
512
 
                        if check_excluded(path):
513
 
                            continue
514
 
                        process_node(node, path, self, self_pending)
515
 
                    if read_basis:
516
 
                        prefix, key, node, path = heapq.heappop(basis_pending)
517
 
                        if check_excluded(path):
518
 
                            continue
519
 
                        process_node(node, path, basis, basis_pending)
520
 
        # print loop_counter
521
 
 
522
 
    def iteritems(self, key_filter=None):
523
 
        """Iterate over the entire CHKMap's contents."""
524
 
        self._ensure_root()
525
 
        if key_filter is not None:
526
 
            as_st = StaticTuple.from_sequence
527
 
            key_filter = [as_st(key) for key in key_filter]
528
 
        return self._root_node.iteritems(self._store, key_filter=key_filter)
529
 
 
530
 
    def key(self):
531
 
        """Return the key for this map."""
532
 
        if isinstance(self._root_node, StaticTuple):
533
 
            return self._root_node
534
 
        else:
535
 
            return self._root_node._key
536
 
 
537
 
    def __len__(self):
538
 
        self._ensure_root()
539
 
        return len(self._root_node)
540
 
 
541
 
    def map(self, key, value):
542
 
        """Map a key tuple to value.
543
 
 
544
 
        :param key: A key to map.
545
 
        :param value: The value to assign to key.
546
 
        """
547
 
        key = StaticTuple.from_sequence(key)
548
 
        # Need a root object.
549
 
        self._ensure_root()
550
 
        prefix, node_details = self._root_node.map(self._store, key, value)
551
 
        if len(node_details) == 1:
552
 
            self._root_node = node_details[0][1]
553
 
        else:
554
 
            self._root_node = InternalNode(prefix,
555
 
                                           search_key_func=self._search_key_func)
556
 
            self._root_node.set_maximum_size(node_details[0][1].maximum_size)
557
 
            self._root_node._key_width = node_details[0][1]._key_width
558
 
            for split, node in node_details:
559
 
                self._root_node.add_node(split, node)
560
 
 
561
 
    def _node_key(self, node):
562
 
        """Get the key for a node whether it's a tuple or node."""
563
 
        if isinstance(node, tuple):
564
 
            node = StaticTuple.from_sequence(node)
565
 
        if isinstance(node, StaticTuple):
566
 
            return node
567
 
        else:
568
 
            return node._key
569
 
 
570
 
    def unmap(self, key, check_remap=True):
571
 
        """remove key from the map."""
572
 
        key = StaticTuple.from_sequence(key)
573
 
        self._ensure_root()
574
 
        if isinstance(self._root_node, InternalNode):
575
 
            unmapped = self._root_node.unmap(self._store, key,
576
 
                                             check_remap=check_remap)
577
 
        else:
578
 
            unmapped = self._root_node.unmap(self._store, key)
579
 
        self._root_node = unmapped
580
 
 
581
 
    def _check_remap(self):
582
 
        """Check if nodes can be collapsed."""
583
 
        self._ensure_root()
584
 
        if isinstance(self._root_node, InternalNode):
585
 
            self._root_node = self._root_node._check_remap(self._store)
586
 
 
587
 
    def _save(self):
588
 
        """Save the map completely.
589
 
 
590
 
        :return: The key of the root node.
591
 
        """
592
 
        if isinstance(self._root_node, StaticTuple):
593
 
            # Already saved.
594
 
            return self._root_node
595
 
        keys = list(self._root_node.serialise(self._store))
596
 
        return keys[-1]
597
 
 
598
 
 
599
 
class Node(object):
600
 
    """Base class defining the protocol for CHK Map nodes.
601
 
 
602
 
    :ivar _raw_size: The total size of the serialized key:value data, before
603
 
        adding the header bytes, and without prefix compression.
604
 
    """
605
 
 
606
 
    __slots__ = ('_key', '_len', '_maximum_size', '_key_width',
607
 
                 '_raw_size', '_items', '_search_prefix', '_search_key_func'
608
 
                 )
609
 
 
610
 
    def __init__(self, key_width=1):
611
 
        """Create a node.
612
 
 
613
 
        :param key_width: The width of keys for this node.
614
 
        """
615
 
        self._key = None
616
 
        # Current number of elements
617
 
        self._len = 0
618
 
        self._maximum_size = 0
619
 
        self._key_width = key_width
620
 
        # current size in bytes
621
 
        self._raw_size = 0
622
 
        # The pointers/values this node has - meaning defined by child classes.
623
 
        self._items = {}
624
 
        # The common search prefix
625
 
        self._search_prefix = None
626
 
 
627
 
    def __repr__(self):
628
 
        items_str = str(sorted(self._items))
629
 
        if len(items_str) > 20:
630
 
            items_str = items_str[:16] + '...]'
631
 
        return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
632
 
            self.__class__.__name__, self._key, self._len, self._raw_size,
633
 
            self._maximum_size, self._search_prefix, items_str)
634
 
 
635
 
    def key(self):
636
 
        return self._key
637
 
 
638
 
    def __len__(self):
639
 
        return self._len
640
 
 
641
 
    @property
642
 
    def maximum_size(self):
643
 
        """What is the upper limit for adding references to a node."""
644
 
        return self._maximum_size
645
 
 
646
 
    def set_maximum_size(self, new_size):
647
 
        """Set the size threshold for nodes.
648
 
 
649
 
        :param new_size: The size at which no data is added to a node. 0 for
650
 
            unlimited.
651
 
        """
652
 
        self._maximum_size = new_size
653
 
 
654
 
    @classmethod
655
 
    def common_prefix(cls, prefix, key):
656
 
        """Given 2 strings, return the longest prefix common to both.
657
 
 
658
 
        :param prefix: This has been the common prefix for other keys, so it is
659
 
            more likely to be the common prefix in this case as well.
660
 
        :param key: Another string to compare to
661
 
        """
662
 
        if key.startswith(prefix):
663
 
            return prefix
664
 
        pos = -1
665
 
        # Is there a better way to do this?
666
 
        for pos, (left, right) in enumerate(zip(prefix, key)):
667
 
            if left != right:
668
 
                pos -= 1
669
 
                break
670
 
        common = prefix[:pos + 1]
671
 
        return common
672
 
 
673
 
    @classmethod
674
 
    def common_prefix_for_keys(cls, keys):
675
 
        """Given a list of keys, find their common prefix.
676
 
 
677
 
        :param keys: An iterable of strings.
678
 
        :return: The longest common prefix of all keys.
679
 
        """
680
 
        common_prefix = None
681
 
        for key in keys:
682
 
            if common_prefix is None:
683
 
                common_prefix = key
684
 
                continue
685
 
            common_prefix = cls.common_prefix(common_prefix, key)
686
 
            if not common_prefix:
687
 
                # if common_prefix is the empty string, then we know it won't
688
 
                # change further
689
 
                return b''
690
 
        return common_prefix
691
 
 
692
 
 
693
 
# Singleton indicating we have not computed _search_prefix yet
694
 
_unknown = object()
695
 
 
696
 
 
697
 
class LeafNode(Node):
698
 
    """A node containing actual key:value pairs.
699
 
 
700
 
    :ivar _items: A dict of key->value items. The key is in tuple form.
701
 
    :ivar _size: The number of bytes that would be used by serializing all of
702
 
        the key/value pairs.
703
 
    """
704
 
 
705
 
    __slots__ = ('_common_serialised_prefix',)
706
 
 
707
 
    def __init__(self, search_key_func=None):
708
 
        Node.__init__(self)
709
 
        # All of the keys in this leaf node share this common prefix
710
 
        self._common_serialised_prefix = None
711
 
        if search_key_func is None:
712
 
            self._search_key_func = _search_key_plain
713
 
        else:
714
 
            self._search_key_func = search_key_func
715
 
 
716
 
    def __repr__(self):
717
 
        items_str = str(sorted(self._items))
718
 
        if len(items_str) > 20:
719
 
            items_str = items_str[:16] + '...]'
720
 
        return \
721
 
            '%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
722
 
            % (self.__class__.__name__, self._key, self._len, self._raw_size,
723
 
               self._maximum_size, self._search_prefix, self._key_width, items_str)
724
 
 
725
 
    def _current_size(self):
726
 
        """Answer the current serialised size of this node.
727
 
 
728
 
        This differs from self._raw_size in that it includes the bytes used for
729
 
        the header.
730
 
        """
731
 
        if self._common_serialised_prefix is None:
732
 
            bytes_for_items = 0
733
 
            prefix_len = 0
734
 
        else:
735
 
            # We will store a single string with the common prefix
736
 
            # And then that common prefix will not be stored in any of the
737
 
            # entry lines
738
 
            prefix_len = len(self._common_serialised_prefix)
739
 
            bytes_for_items = (self._raw_size - (prefix_len * self._len))
740
 
        return (9 +  # 'chkleaf:\n' +
741
 
                len(str(self._maximum_size)) + 1 +
742
 
                len(str(self._key_width)) + 1 +
743
 
                len(str(self._len)) + 1 +
744
 
                prefix_len + 1 +
745
 
                bytes_for_items)
746
 
 
747
 
    @classmethod
748
 
    def deserialise(klass, bytes, key, search_key_func=None):
749
 
        """Deserialise bytes, with key key, into a LeafNode.
750
 
 
751
 
        :param bytes: The bytes of the node.
752
 
        :param key: The key that the serialised node has.
753
 
        """
754
 
        key = static_tuple.expect_static_tuple(key)
755
 
        return _deserialise_leaf_node(bytes, key,
756
 
                                      search_key_func=search_key_func)
757
 
 
758
 
    def iteritems(self, store, key_filter=None):
759
 
        """Iterate over items in the node.
760
 
 
761
 
        :param key_filter: A filter to apply to the node. It should be a
762
 
            list/set/dict or similar repeatedly iterable container.
763
 
        """
764
 
        if key_filter is not None:
765
 
            # Adjust the filter - short elements go to a prefix filter. All
766
 
            # other items are looked up directly.
767
 
            # XXX: perhaps defaultdict? Profiling<rinse and repeat>
768
 
            filters = {}
769
 
            for key in key_filter:
770
 
                if len(key) == self._key_width:
771
 
                    # This filter is meant to match exactly one key, yield it
772
 
                    # if we have it.
773
 
                    try:
774
 
                        yield key, self._items[key]
775
 
                    except KeyError:
776
 
                        # This key is not present in this map, continue
777
 
                        pass
778
 
                else:
779
 
                    # Short items, we need to match based on a prefix
780
 
                    filters.setdefault(len(key), set()).add(key)
781
 
            if filters:
782
 
                filters_itemview = viewitems(filters)
783
 
                for item in viewitems(self._items):
784
 
                    for length, length_filter in filters_itemview:
785
 
                        if item[0][:length] in length_filter:
786
 
                            yield item
787
 
                            break
788
 
        else:
789
 
            for item in viewitems(self._items):
790
 
                yield item
791
 
 
792
 
    def _key_value_len(self, key, value):
793
 
        # TODO: Should probably be done without actually joining the key, but
794
 
        #       then that can be done via the C extension
795
 
        return (len(self._serialise_key(key)) + 1 +
796
 
                len(b'%d' % value.count(b'\n')) + 1 +
797
 
                len(value) + 1)
798
 
 
799
 
    def _search_key(self, key):
800
 
        return self._search_key_func(key)
801
 
 
802
 
    def _map_no_split(self, key, value):
803
 
        """Map a key to a value.
804
 
 
805
 
        This assumes either the key does not already exist, or you have already
806
 
        removed its size and length from self.
807
 
 
808
 
        :return: True if adding this node should cause us to split.
809
 
        """
810
 
        self._items[key] = value
811
 
        self._raw_size += self._key_value_len(key, value)
812
 
        self._len += 1
813
 
        serialised_key = self._serialise_key(key)
814
 
        if self._common_serialised_prefix is None:
815
 
            self._common_serialised_prefix = serialised_key
816
 
        else:
817
 
            self._common_serialised_prefix = self.common_prefix(
818
 
                self._common_serialised_prefix, serialised_key)
819
 
        search_key = self._search_key(key)
820
 
        if self._search_prefix is _unknown:
821
 
            self._compute_search_prefix()
822
 
        if self._search_prefix is None:
823
 
            self._search_prefix = search_key
824
 
        else:
825
 
            self._search_prefix = self.common_prefix(
826
 
                self._search_prefix, search_key)
827
 
        if (self._len > 1 and
828
 
            self._maximum_size and
829
 
                self._current_size() > self._maximum_size):
830
 
            # Check to see if all of the search_keys for this node are
831
 
            # identical. We allow the node to grow under that circumstance
832
 
            # (we could track this as common state, but it is infrequent)
833
 
            if (search_key != self._search_prefix or
834
 
                    not self._are_search_keys_identical()):
835
 
                return True
836
 
        return False
837
 
 
838
 
    def _split(self, store):
839
 
        """We have overflowed.
840
 
 
841
 
        Split this node into multiple LeafNodes, return it up the stack so that
842
 
        the next layer creates a new InternalNode and references the new nodes.
843
 
 
844
 
        :return: (common_serialised_prefix, [(node_serialised_prefix, node)])
845
 
        """
846
 
        if self._search_prefix is _unknown:
847
 
            raise AssertionError('Search prefix must be known')
848
 
        common_prefix = self._search_prefix
849
 
        split_at = len(common_prefix) + 1
850
 
        result = {}
851
 
        for key, value in viewitems(self._items):
852
 
            search_key = self._search_key(key)
853
 
            prefix = search_key[:split_at]
854
 
            # TODO: Generally only 1 key can be exactly the right length,
855
 
            #       which means we can only have 1 key in the node pointed
856
 
            #       at by the 'prefix\0' key. We might want to consider
857
 
            #       folding it into the containing InternalNode rather than
858
 
            #       having a fixed length-1 node.
859
 
            #       Note this is probably not true for hash keys, as they
860
 
            #       may get a '\00' node anywhere, but won't have keys of
861
 
            #       different lengths.
862
 
            if len(prefix) < split_at:
863
 
                prefix += b'\x00' * (split_at - len(prefix))
864
 
            if prefix not in result:
865
 
                node = LeafNode(search_key_func=self._search_key_func)
866
 
                node.set_maximum_size(self._maximum_size)
867
 
                node._key_width = self._key_width
868
 
                result[prefix] = node
869
 
            else:
870
 
                node = result[prefix]
871
 
            sub_prefix, node_details = node.map(store, key, value)
872
 
            if len(node_details) > 1:
873
 
                if prefix != sub_prefix:
874
 
                    # This node has been split and is now found via a different
875
 
                    # path
876
 
                    result.pop(prefix)
877
 
                new_node = InternalNode(sub_prefix,
878
 
                                        search_key_func=self._search_key_func)
879
 
                new_node.set_maximum_size(self._maximum_size)
880
 
                new_node._key_width = self._key_width
881
 
                for split, node in node_details:
882
 
                    new_node.add_node(split, node)
883
 
                result[prefix] = new_node
884
 
        return common_prefix, list(viewitems(result))
885
 
 
886
 
    def map(self, store, key, value):
887
 
        """Map key to value."""
888
 
        if key in self._items:
889
 
            self._raw_size -= self._key_value_len(key, self._items[key])
890
 
            self._len -= 1
891
 
        self._key = None
892
 
        if self._map_no_split(key, value):
893
 
            return self._split(store)
894
 
        else:
895
 
            if self._search_prefix is _unknown:
896
 
                raise AssertionError('%r must be known' % self._search_prefix)
897
 
            return self._search_prefix, [(b"", self)]
898
 
 
899
 
    _serialise_key = b'\x00'.join
900
 
 
901
 
    def serialise(self, store):
902
 
        """Serialise the LeafNode to store.
903
 
 
904
 
        :param store: A VersionedFiles honouring the CHK extensions.
905
 
        :return: An iterable of the keys inserted by this operation.
906
 
        """
907
 
        lines = [b"chkleaf:\n"]
908
 
        lines.append(b"%d\n" % self._maximum_size)
909
 
        lines.append(b"%d\n" % self._key_width)
910
 
        lines.append(b"%d\n" % self._len)
911
 
        if self._common_serialised_prefix is None:
912
 
            lines.append(b'\n')
913
 
            if len(self._items) != 0:
914
 
                raise AssertionError('If _common_serialised_prefix is None'
915
 
                                     ' we should have no items')
916
 
        else:
917
 
            lines.append(b'%s\n' % (self._common_serialised_prefix,))
918
 
            prefix_len = len(self._common_serialised_prefix)
919
 
        for key, value in sorted(viewitems(self._items)):
920
 
            # Always add a final newline
921
 
            value_lines = osutils.chunks_to_lines([value + b'\n'])
922
 
            serialized = b"%s\x00%d\n" % (self._serialise_key(key),
923
 
                                          len(value_lines))
924
 
            if not serialized.startswith(self._common_serialised_prefix):
925
 
                raise AssertionError('We thought the common prefix was %r'
926
 
                                     ' but entry %r does not have it in common'
927
 
                                     % (self._common_serialised_prefix, serialized))
928
 
            lines.append(serialized[prefix_len:])
929
 
            lines.extend(value_lines)
930
 
        sha1, _, _ = store.add_lines((None,), (), lines)
931
 
        self._key = StaticTuple(b"sha1:" + sha1,).intern()
932
 
        data = b''.join(lines)
933
 
        if len(data) != self._current_size():
934
 
            raise AssertionError('Invalid _current_size')
935
 
        _get_cache()[self._key] = data
936
 
        return [self._key]
937
 
 
938
 
    def refs(self):
939
 
        """Return the references to other CHK's held by this node."""
940
 
        return []
941
 
 
942
 
    def _compute_search_prefix(self):
943
 
        """Determine the common search prefix for all keys in this node.
944
 
 
945
 
        :return: A bytestring of the longest search key prefix that is
946
 
            unique within this node.
947
 
        """
948
 
        search_keys = [self._search_key_func(key) for key in self._items]
949
 
        self._search_prefix = self.common_prefix_for_keys(search_keys)
950
 
        return self._search_prefix
951
 
 
952
 
    def _are_search_keys_identical(self):
953
 
        """Check to see if the search keys for all entries are the same.
954
 
 
955
 
        When using a hash as the search_key it is possible for non-identical
956
 
        keys to collide. If that happens enough, we may try overflow a
957
 
        LeafNode, but as all are collisions, we must not split.
958
 
        """
959
 
        common_search_key = None
960
 
        for key in self._items:
961
 
            search_key = self._search_key(key)
962
 
            if common_search_key is None:
963
 
                common_search_key = search_key
964
 
            elif search_key != common_search_key:
965
 
                return False
966
 
        return True
967
 
 
968
 
    def _compute_serialised_prefix(self):
969
 
        """Determine the common prefix for serialised keys in this node.
970
 
 
971
 
        :return: A bytestring of the longest serialised key prefix that is
972
 
            unique within this node.
973
 
        """
974
 
        serialised_keys = [self._serialise_key(key) for key in self._items]
975
 
        self._common_serialised_prefix = self.common_prefix_for_keys(
976
 
            serialised_keys)
977
 
        return self._common_serialised_prefix
978
 
 
979
 
    def unmap(self, store, key):
980
 
        """Unmap key from the node."""
981
 
        try:
982
 
            self._raw_size -= self._key_value_len(key, self._items[key])
983
 
        except KeyError:
984
 
            trace.mutter("key %s not found in %r", key, self._items)
985
 
            raise
986
 
        self._len -= 1
987
 
        del self._items[key]
988
 
        self._key = None
989
 
        # Recompute from scratch
990
 
        self._compute_search_prefix()
991
 
        self._compute_serialised_prefix()
992
 
        return self
993
 
 
994
 
 
995
 
class InternalNode(Node):
996
 
    """A node that contains references to other nodes.
997
 
 
998
 
    An InternalNode is responsible for mapping search key prefixes to child
999
 
    nodes.
1000
 
 
1001
 
    :ivar _items: serialised_key => node dictionary. node may be a tuple,
1002
 
        LeafNode or InternalNode.
1003
 
    """
1004
 
 
1005
 
    __slots__ = ('_node_width',)
1006
 
 
1007
 
    def __init__(self, prefix=b'', search_key_func=None):
1008
 
        Node.__init__(self)
1009
 
        # The size of an internalnode with default values and no children.
1010
 
        # How many octets key prefixes within this node are.
1011
 
        self._node_width = 0
1012
 
        self._search_prefix = prefix
1013
 
        if search_key_func is None:
1014
 
            self._search_key_func = _search_key_plain
1015
 
        else:
1016
 
            self._search_key_func = search_key_func
1017
 
 
1018
 
    def add_node(self, prefix, node):
1019
 
        """Add a child node with prefix prefix, and node node.
1020
 
 
1021
 
        :param prefix: The search key prefix for node.
1022
 
        :param node: The node being added.
1023
 
        """
1024
 
        if self._search_prefix is None:
1025
 
            raise AssertionError("_search_prefix should not be None")
1026
 
        if not prefix.startswith(self._search_prefix):
1027
 
            raise AssertionError("prefixes mismatch: %s must start with %s"
1028
 
                                 % (prefix, self._search_prefix))
1029
 
        if len(prefix) != len(self._search_prefix) + 1:
1030
 
            raise AssertionError("prefix wrong length: len(%s) is not %d" %
1031
 
                                 (prefix, len(self._search_prefix) + 1))
1032
 
        self._len += len(node)
1033
 
        if not len(self._items):
1034
 
            self._node_width = len(prefix)
1035
 
        if self._node_width != len(self._search_prefix) + 1:
1036
 
            raise AssertionError("node width mismatch: %d is not %d" %
1037
 
                                 (self._node_width, len(self._search_prefix) + 1))
1038
 
        self._items[prefix] = node
1039
 
        self._key = None
1040
 
 
1041
 
    def _current_size(self):
1042
 
        """Answer the current serialised size of this node."""
1043
 
        return (self._raw_size + len(str(self._len)) + len(str(self._key_width))
1044
 
                + len(str(self._maximum_size)))
1045
 
 
1046
 
    @classmethod
1047
 
    def deserialise(klass, bytes, key, search_key_func=None):
1048
 
        """Deserialise bytes to an InternalNode, with key key.
1049
 
 
1050
 
        :param bytes: The bytes of the node.
1051
 
        :param key: The key that the serialised node has.
1052
 
        :return: An InternalNode instance.
1053
 
        """
1054
 
        key = static_tuple.expect_static_tuple(key)
1055
 
        return _deserialise_internal_node(bytes, key,
1056
 
                                          search_key_func=search_key_func)
1057
 
 
1058
 
    def iteritems(self, store, key_filter=None):
1059
 
        for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1060
 
            for item in node.iteritems(store, key_filter=node_filter):
1061
 
                yield item
1062
 
 
1063
 
    def _iter_nodes(self, store, key_filter=None, batch_size=None):
1064
 
        """Iterate over node objects which match key_filter.
1065
 
 
1066
 
        :param store: A store to use for accessing content.
1067
 
        :param key_filter: A key filter to filter nodes. Only nodes that might
1068
 
            contain a key in key_filter will be returned.
1069
 
        :param batch_size: If not None, then we will return the nodes that had
1070
 
            to be read using get_record_stream in batches, rather than reading
1071
 
            them all at once.
1072
 
        :return: An iterable of nodes. This function does not have to be fully
1073
 
            consumed.  (There will be no pending I/O when items are being returned.)
1074
 
        """
1075
 
        # Map from chk key ('sha1:...',) to (prefix, key_filter)
1076
 
        # prefix is the key in self._items to use, key_filter is the key_filter
1077
 
        # entries that would match this node
1078
 
        keys = {}
1079
 
        shortcut = False
1080
 
        if key_filter is None:
1081
 
            # yielding all nodes, yield whatever we have, and queue up a read
1082
 
            # for whatever we are missing
1083
 
            shortcut = True
1084
 
            for prefix, node in viewitems(self._items):
1085
 
                if node.__class__ is StaticTuple:
1086
 
                    keys[node] = (prefix, None)
1087
 
                else:
1088
 
                    yield node, None
1089
 
        elif len(key_filter) == 1:
1090
 
            # Technically, this path could also be handled by the first check
1091
 
            # in 'self._node_width' in length_filters. However, we can handle
1092
 
            # this case without spending any time building up the
1093
 
            # prefix_to_keys, etc state.
1094
 
 
1095
 
            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
1096
 
            # 0.626us   list(key_filter)[0]
1097
 
            #       is a func() for list(), 2 mallocs, and a getitem
1098
 
            # 0.489us   [k for k in key_filter][0]
1099
 
            #       still has the mallocs, avoids the func() call
1100
 
            # 0.350us   iter(key_filter).next()
1101
 
            #       has a func() call, and mallocs an iterator
1102
 
            # 0.125us   for key in key_filter: pass
1103
 
            #       no func() overhead, might malloc an iterator
1104
 
            # 0.105us   for key in key_filter: break
1105
 
            #       no func() overhead, might malloc an iterator, probably
1106
 
            #       avoids checking an 'else' clause as part of the for
1107
 
            for key in key_filter:
1108
 
                break
1109
 
            search_prefix = self._search_prefix_filter(key)
1110
 
            if len(search_prefix) == self._node_width:
1111
 
                # This item will match exactly, so just do a dict lookup, and
1112
 
                # see what we can return
1113
 
                shortcut = True
1114
 
                try:
1115
 
                    node = self._items[search_prefix]
1116
 
                except KeyError:
1117
 
                    # A given key can only match 1 child node, if it isn't
1118
 
                    # there, then we can just return nothing
1119
 
                    return
1120
 
                if node.__class__ is StaticTuple:
1121
 
                    keys[node] = (search_prefix, [key])
1122
 
                else:
1123
 
                    # This is loaded, and the only thing that can match,
1124
 
                    # return
1125
 
                    yield node, [key]
1126
 
                    return
1127
 
        if not shortcut:
1128
 
            # First, convert all keys into a list of search prefixes
1129
 
            # Aggregate common prefixes, and track the keys they come from
1130
 
            prefix_to_keys = {}
1131
 
            length_filters = {}
1132
 
            for key in key_filter:
1133
 
                search_prefix = self._search_prefix_filter(key)
1134
 
                length_filter = length_filters.setdefault(
1135
 
                    len(search_prefix), set())
1136
 
                length_filter.add(search_prefix)
1137
 
                prefix_to_keys.setdefault(search_prefix, []).append(key)
1138
 
 
1139
 
            if (self._node_width in length_filters and
1140
 
                    len(length_filters) == 1):
1141
 
                # all of the search prefixes match exactly _node_width. This
1142
 
                # means that everything is an exact match, and we can do a
1143
 
                # lookup into self._items, rather than iterating over the items
1144
 
                # dict.
1145
 
                search_prefixes = length_filters[self._node_width]
1146
 
                for search_prefix in search_prefixes:
1147
 
                    try:
1148
 
                        node = self._items[search_prefix]
1149
 
                    except KeyError:
1150
 
                        # We can ignore this one
1151
 
                        continue
1152
 
                    node_key_filter = prefix_to_keys[search_prefix]
1153
 
                    if node.__class__ is StaticTuple:
1154
 
                        keys[node] = (search_prefix, node_key_filter)
1155
 
                    else:
1156
 
                        yield node, node_key_filter
1157
 
            else:
1158
 
                # The slow way. We walk every item in self._items, and check to
1159
 
                # see if there are any matches
1160
 
                length_filters_itemview = viewitems(length_filters)
1161
 
                for prefix, node in viewitems(self._items):
1162
 
                    node_key_filter = []
1163
 
                    for length, length_filter in length_filters_itemview:
1164
 
                        sub_prefix = prefix[:length]
1165
 
                        if sub_prefix in length_filter:
1166
 
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
1167
 
                    if node_key_filter:  # this key matched something, yield it
1168
 
                        if node.__class__ is StaticTuple:
1169
 
                            keys[node] = (prefix, node_key_filter)
1170
 
                        else:
1171
 
                            yield node, node_key_filter
1172
 
        if keys:
1173
 
            # Look in the page cache for some more bytes
1174
 
            found_keys = set()
1175
 
            for key in keys:
1176
 
                try:
1177
 
                    bytes = _get_cache()[key]
1178
 
                except KeyError:
1179
 
                    continue
1180
 
                else:
1181
 
                    node = _deserialise(bytes, key,
1182
 
                                        search_key_func=self._search_key_func)
1183
 
                    prefix, node_key_filter = keys[key]
1184
 
                    self._items[prefix] = node
1185
 
                    found_keys.add(key)
1186
 
                    yield node, node_key_filter
1187
 
            for key in found_keys:
1188
 
                del keys[key]
1189
 
        if keys:
1190
 
            # demand load some pages.
1191
 
            if batch_size is None:
1192
 
                # Read all the keys in
1193
 
                batch_size = len(keys)
1194
 
            key_order = list(keys)
1195
 
            for batch_start in range(0, len(key_order), batch_size):
1196
 
                batch = key_order[batch_start:batch_start + batch_size]
1197
 
                # We have to fully consume the stream so there is no pending
1198
 
                # I/O, so we buffer the nodes for now.
1199
 
                stream = store.get_record_stream(batch, 'unordered', True)
1200
 
                node_and_filters = []
1201
 
                for record in stream:
1202
 
                    bytes = record.get_bytes_as('fulltext')
1203
 
                    node = _deserialise(bytes, record.key,
1204
 
                                        search_key_func=self._search_key_func)
1205
 
                    prefix, node_key_filter = keys[record.key]
1206
 
                    node_and_filters.append((node, node_key_filter))
1207
 
                    self._items[prefix] = node
1208
 
                    _get_cache()[record.key] = bytes
1209
 
                for info in node_and_filters:
1210
 
                    yield info
1211
 
 
1212
 
    def map(self, store, key, value):
1213
 
        """Map key to value."""
1214
 
        if not len(self._items):
1215
 
            raise AssertionError("can't map in an empty InternalNode.")
1216
 
        search_key = self._search_key(key)
1217
 
        if self._node_width != len(self._search_prefix) + 1:
1218
 
            raise AssertionError("node width mismatch: %d is not %d" %
1219
 
                                 (self._node_width, len(self._search_prefix) + 1))
1220
 
        if not search_key.startswith(self._search_prefix):
1221
 
            # This key doesn't fit in this index, so we need to split at the
1222
 
            # point where it would fit, insert self into that internal node,
1223
 
            # and then map this key into that node.
1224
 
            new_prefix = self.common_prefix(self._search_prefix,
1225
 
                                            search_key)
1226
 
            new_parent = InternalNode(new_prefix,
1227
 
                                      search_key_func=self._search_key_func)
1228
 
            new_parent.set_maximum_size(self._maximum_size)
1229
 
            new_parent._key_width = self._key_width
1230
 
            new_parent.add_node(self._search_prefix[:len(new_prefix) + 1],
1231
 
                                self)
1232
 
            return new_parent.map(store, key, value)
1233
 
        children = [node for node, _ in self._iter_nodes(
1234
 
            store, key_filter=[key])]
1235
 
        if children:
1236
 
            child = children[0]
1237
 
        else:
1238
 
            # new child needed:
1239
 
            child = self._new_child(search_key, LeafNode)
1240
 
        old_len = len(child)
1241
 
        if isinstance(child, LeafNode):
1242
 
            old_size = child._current_size()
1243
 
        else:
1244
 
            old_size = None
1245
 
        prefix, node_details = child.map(store, key, value)
1246
 
        if len(node_details) == 1:
1247
 
            # child may have shrunk, or might be a new node
1248
 
            child = node_details[0][1]
1249
 
            self._len = self._len - old_len + len(child)
1250
 
            self._items[search_key] = child
1251
 
            self._key = None
1252
 
            new_node = self
1253
 
            if isinstance(child, LeafNode):
1254
 
                if old_size is None:
1255
 
                    # The old node was an InternalNode which means it has now
1256
 
                    # collapsed, so we need to check if it will chain to a
1257
 
                    # collapse at this level.
1258
 
                    trace.mutter("checking remap as InternalNode -> LeafNode")
1259
 
                    new_node = self._check_remap(store)
1260
 
                else:
1261
 
                    # If the LeafNode has shrunk in size, we may want to run
1262
 
                    # a remap check. Checking for a remap is expensive though
1263
 
                    # and the frequency of a successful remap is very low.
1264
 
                    # Shrinkage by small amounts is common, so we only do the
1265
 
                    # remap check if the new_size is low or the shrinkage
1266
 
                    # amount is over a configurable limit.
1267
 
                    new_size = child._current_size()
1268
 
                    shrinkage = old_size - new_size
1269
 
                    if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE or
1270
 
                            shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1271
 
                        trace.mutter(
1272
 
                            "checking remap as size shrunk by %d to be %d",
1273
 
                            shrinkage, new_size)
1274
 
                        new_node = self._check_remap(store)
1275
 
            if new_node._search_prefix is None:
1276
 
                raise AssertionError("_search_prefix should not be None")
1277
 
            return new_node._search_prefix, [(b'', new_node)]
1278
 
        # child has overflown - create a new intermediate node.
1279
 
        # XXX: This is where we might want to try and expand our depth
1280
 
        # to refer to more bytes of every child (which would give us
1281
 
        # multiple pointers to child nodes, but less intermediate nodes)
1282
 
        child = self._new_child(search_key, InternalNode)
1283
 
        child._search_prefix = prefix
1284
 
        for split, node in node_details:
1285
 
            child.add_node(split, node)
1286
 
        self._len = self._len - old_len + len(child)
1287
 
        self._key = None
1288
 
        return self._search_prefix, [(b"", self)]
1289
 
 
1290
 
    def _new_child(self, search_key, klass):
1291
 
        """Create a new child node of type klass."""
1292
 
        child = klass()
1293
 
        child.set_maximum_size(self._maximum_size)
1294
 
        child._key_width = self._key_width
1295
 
        child._search_key_func = self._search_key_func
1296
 
        self._items[search_key] = child
1297
 
        return child
1298
 
 
1299
 
    def serialise(self, store):
1300
 
        """Serialise the node to store.
1301
 
 
1302
 
        :param store: A VersionedFiles honouring the CHK extensions.
1303
 
        :return: An iterable of the keys inserted by this operation.
1304
 
        """
1305
 
        for node in viewvalues(self._items):
1306
 
            if isinstance(node, StaticTuple):
1307
 
                # Never deserialised.
1308
 
                continue
1309
 
            if node._key is not None:
1310
 
                # Never altered
1311
 
                continue
1312
 
            for key in node.serialise(store):
1313
 
                yield key
1314
 
        lines = [b"chknode:\n"]
1315
 
        lines.append(b"%d\n" % self._maximum_size)
1316
 
        lines.append(b"%d\n" % self._key_width)
1317
 
        lines.append(b"%d\n" % self._len)
1318
 
        if self._search_prefix is None:
1319
 
            raise AssertionError("_search_prefix should not be None")
1320
 
        lines.append(b'%s\n' % (self._search_prefix,))
1321
 
        prefix_len = len(self._search_prefix)
1322
 
        for prefix, node in sorted(viewitems(self._items)):
1323
 
            if isinstance(node, StaticTuple):
1324
 
                key = node[0]
1325
 
            else:
1326
 
                key = node._key[0]
1327
 
            serialised = b"%s\x00%s\n" % (prefix, key)
1328
 
            if not serialised.startswith(self._search_prefix):
1329
 
                raise AssertionError("prefixes mismatch: %s must start with %s"
1330
 
                                     % (serialised, self._search_prefix))
1331
 
            lines.append(serialised[prefix_len:])
1332
 
        sha1, _, _ = store.add_lines((None,), (), lines)
1333
 
        self._key = StaticTuple(b"sha1:" + sha1,).intern()
1334
 
        _get_cache()[self._key] = b''.join(lines)
1335
 
        yield self._key
1336
 
 
1337
 
    def _search_key(self, key):
1338
 
        """Return the serialised key for key in this node."""
1339
 
        # search keys are fixed width. All will be self._node_width wide, so we
1340
 
        # pad as necessary.
1341
 
        return (self._search_key_func(key) + b'\x00' * self._node_width)[:self._node_width]
1342
 
 
1343
 
    def _search_prefix_filter(self, key):
1344
 
        """Serialise key for use as a prefix filter in iteritems."""
1345
 
        return self._search_key_func(key)[:self._node_width]
1346
 
 
1347
 
    def _split(self, offset):
1348
 
        """Split this node into smaller nodes starting at offset.
1349
 
 
1350
 
        :param offset: The offset to start the new child nodes at.
1351
 
        :return: An iterable of (prefix, node) tuples. prefix is a byte
1352
 
            prefix for reaching node.
1353
 
        """
1354
 
        if offset >= self._node_width:
1355
 
            for node in valueview(self._items):
1356
 
                for result in node._split(offset):
1357
 
                    yield result
1358
 
 
1359
 
    def refs(self):
1360
 
        """Return the references to other CHK's held by this node."""
1361
 
        if self._key is None:
1362
 
            raise AssertionError("unserialised nodes have no refs.")
1363
 
        refs = []
1364
 
        for value in viewvalues(self._items):
1365
 
            if isinstance(value, StaticTuple):
1366
 
                refs.append(value)
1367
 
            else:
1368
 
                refs.append(value.key())
1369
 
        return refs
1370
 
 
1371
 
    def _compute_search_prefix(self, extra_key=None):
1372
 
        """Return the unique key prefix for this node.
1373
 
 
1374
 
        :return: A bytestring of the longest search key prefix that is
1375
 
            unique within this node.
1376
 
        """
1377
 
        self._search_prefix = self.common_prefix_for_keys(self._items)
1378
 
        return self._search_prefix
1379
 
 
1380
 
    def unmap(self, store, key, check_remap=True):
1381
 
        """Remove key from this node and its children."""
1382
 
        if not len(self._items):
1383
 
            raise AssertionError("can't unmap in an empty InternalNode.")
1384
 
        children = [node for node, _
1385
 
                    in self._iter_nodes(store, key_filter=[key])]
1386
 
        if children:
1387
 
            child = children[0]
1388
 
        else:
1389
 
            raise KeyError(key)
1390
 
        self._len -= 1
1391
 
        unmapped = child.unmap(store, key)
1392
 
        self._key = None
1393
 
        search_key = self._search_key(key)
1394
 
        if len(unmapped) == 0:
1395
 
            # All child nodes are gone, remove the child:
1396
 
            del self._items[search_key]
1397
 
            unmapped = None
1398
 
        else:
1399
 
            # Stash the returned node
1400
 
            self._items[search_key] = unmapped
1401
 
        if len(self._items) == 1:
1402
 
            # this node is no longer needed:
1403
 
            return list(viewvalues(self._items))[0]
1404
 
        if isinstance(unmapped, InternalNode):
1405
 
            return self
1406
 
        if check_remap:
1407
 
            return self._check_remap(store)
1408
 
        else:
1409
 
            return self
1410
 
 
1411
 
    def _check_remap(self, store):
1412
 
        """Check if all keys contained by children fit in a single LeafNode.
1413
 
 
1414
 
        :param store: A store to use for reading more nodes
1415
 
        :return: Either self, or a new LeafNode which should replace self.
1416
 
        """
1417
 
        # Logic for how we determine when we need to rebuild
1418
 
        # 1) Implicitly unmap() is removing a key which means that the child
1419
 
        #    nodes are going to be shrinking by some extent.
1420
 
        # 2) If all children are LeafNodes, it is possible that they could be
1421
 
        #    combined into a single LeafNode, which can then completely replace
1422
 
        #    this internal node with a single LeafNode
1423
 
        # 3) If *one* child is an InternalNode, we assume it has already done
1424
 
        #    all the work to determine that its children cannot collapse, and
1425
 
        #    we can then assume that those nodes *plus* the current nodes don't
1426
 
        #    have a chance of collapsing either.
1427
 
        #    So a very cheap check is to just say if 'unmapped' is an
1428
 
        #    InternalNode, we don't have to check further.
1429
 
 
1430
 
        # TODO: Another alternative is to check the total size of all known
1431
 
        #       LeafNodes. If there is some formula we can use to determine the
1432
 
        #       final size without actually having to read in any more
1433
 
        #       children, it would be nice to have. However, we have to be
1434
 
        #       careful with stuff like nodes that pull out the common prefix
1435
 
        #       of each key, as adding a new key can change the common prefix
1436
 
        #       and cause size changes greater than the length of one key.
1437
 
        #       So for now, we just add everything to a new Leaf until it
1438
 
        #       splits, as we know that will give the right answer
1439
 
        new_leaf = LeafNode(search_key_func=self._search_key_func)
1440
 
        new_leaf.set_maximum_size(self._maximum_size)
1441
 
        new_leaf._key_width = self._key_width
1442
 
        # A batch_size of 16 was chosen because:
1443
 
        #   a) In testing, a 4k page held 14 times. So if we have more than 16
1444
 
        #      leaf nodes we are unlikely to hold them in a single new leaf
1445
 
        #      node. This still allows for 1 round trip
1446
 
        #   b) With 16-way fan out, we can still do a single round trip
1447
 
        #   c) With 255-way fan out, we don't want to read all 255 and destroy
1448
 
        #      the page cache, just to determine that we really don't need it.
1449
 
        for node, _ in self._iter_nodes(store, batch_size=16):
1450
 
            if isinstance(node, InternalNode):
1451
 
                # Without looking at any leaf nodes, we are sure
1452
 
                return self
1453
 
            for key, value in viewitems(node._items):
1454
 
                if new_leaf._map_no_split(key, value):
1455
 
                    return self
1456
 
        trace.mutter("remap generated a new LeafNode")
1457
 
        return new_leaf
1458
 
 
1459
 
 
1460
 
def _deserialise(data, key, search_key_func):
1461
 
    """Helper for repositorydetails - convert bytes to a node."""
1462
 
    if data.startswith(b"chkleaf:\n"):
1463
 
        node = LeafNode.deserialise(data, key, search_key_func=search_key_func)
1464
 
    elif data.startswith(b"chknode:\n"):
1465
 
        node = InternalNode.deserialise(data, key,
1466
 
                                        search_key_func=search_key_func)
1467
 
    else:
1468
 
        raise AssertionError("Unknown node type.")
1469
 
    return node
1470
 
 
1471
 
 
1472
 
class CHKMapDifference(object):
1473
 
    """Iterate the stored pages and key,value pairs for (new - old).
1474
 
 
1475
 
    This class provides a generator over the stored CHK pages and the
1476
 
    (key, value) pairs that are in any of the new maps and not in any of the
1477
 
    old maps.
1478
 
 
1479
 
    Note that it may yield chk pages that are common (especially root nodes),
1480
 
    but it won't yield (key,value) pairs that are common.
1481
 
    """
1482
 
 
1483
 
    def __init__(self, store, new_root_keys, old_root_keys,
1484
 
                 search_key_func, pb=None):
1485
 
        # TODO: Should we add a StaticTuple barrier here? It would be nice to
1486
 
        #       force callers to use StaticTuple, because there will often be
1487
 
        #       lots of keys passed in here. And even if we cast it locally,
1488
 
        #       that just meanst that we will have *both* a StaticTuple and a
1489
 
        #       tuple() in memory, referring to the same object. (so a net
1490
 
        #       increase in memory, not a decrease.)
1491
 
        self._store = store
1492
 
        self._new_root_keys = new_root_keys
1493
 
        self._old_root_keys = old_root_keys
1494
 
        self._pb = pb
1495
 
        # All uninteresting chks that we have seen. By the time they are added
1496
 
        # here, they should be either fully ignored, or queued up for
1497
 
        # processing
1498
 
        # TODO: This might grow to a large size if there are lots of merge
1499
 
        #       parents, etc. However, it probably doesn't scale to O(history)
1500
 
        #       like _processed_new_refs does.
1501
 
        self._all_old_chks = set(self._old_root_keys)
1502
 
        # All items that we have seen from the old_root_keys
1503
 
        self._all_old_items = set()
1504
 
        # These are interesting items which were either read, or already in the
1505
 
        # interesting queue (so we don't need to walk them again)
1506
 
        # TODO: processed_new_refs becomes O(all_chks), consider switching to
1507
 
        #       SimpleSet here.
1508
 
        self._processed_new_refs = set()
1509
 
        self._search_key_func = search_key_func
1510
 
 
1511
 
        # The uninteresting and interesting nodes to be searched
1512
 
        self._old_queue = []
1513
 
        self._new_queue = []
1514
 
        # Holds the (key, value) items found when processing the root nodes,
1515
 
        # waiting for the uninteresting nodes to be walked
1516
 
        self._new_item_queue = []
1517
 
        self._state = None
1518
 
 
1519
 
    def _read_nodes_from_store(self, keys):
1520
 
        # We chose not to use _get_cache(), because we think in
1521
 
        # terms of records to be yielded. Also, we expect to touch each page
1522
 
        # only 1 time during this code. (We may want to evaluate saving the
1523
 
        # raw bytes into the page cache, which would allow a working tree
1524
 
        # update after the fetch to not have to read the bytes again.)
1525
 
        as_st = StaticTuple.from_sequence
1526
 
        stream = self._store.get_record_stream(keys, 'unordered', True)
1527
 
        for record in stream:
1528
 
            if self._pb is not None:
1529
 
                self._pb.tick()
1530
 
            if record.storage_kind == 'absent':
1531
 
                raise errors.NoSuchRevision(self._store, record.key)
1532
 
            bytes = record.get_bytes_as('fulltext')
1533
 
            node = _deserialise(bytes, record.key,
1534
 
                                search_key_func=self._search_key_func)
1535
 
            if isinstance(node, InternalNode):
1536
 
                # Note we don't have to do node.refs() because we know that
1537
 
                # there are no children that have been pushed into this node
1538
 
                # Note: Using as_st() here seemed to save 1.2MB, which would
1539
 
                #       indicate that we keep 100k prefix_refs around while
1540
 
                #       processing. They *should* be shorter lived than that...
1541
 
                #       It does cost us ~10s of processing time
1542
 
                prefix_refs = list(viewitems(node._items))
1543
 
                items = []
1544
 
            else:
1545
 
                prefix_refs = []
1546
 
                # Note: We don't use a StaticTuple here. Profiling showed a
1547
 
                #       minor memory improvement (0.8MB out of 335MB peak 0.2%)
1548
 
                #       But a significant slowdown (15s / 145s, or 10%)
1549
 
                items = list(viewitems(node._items))
1550
 
            yield record, node, prefix_refs, items
1551
 
 
1552
 
    def _read_old_roots(self):
1553
 
        old_chks_to_enqueue = []
1554
 
        all_old_chks = self._all_old_chks
1555
 
        for record, node, prefix_refs, items in \
1556
 
                self._read_nodes_from_store(self._old_root_keys):
1557
 
            # Uninteresting node
1558
 
            prefix_refs = [p_r for p_r in prefix_refs
1559
 
                           if p_r[1] not in all_old_chks]
1560
 
            new_refs = [p_r[1] for p_r in prefix_refs]
1561
 
            all_old_chks.update(new_refs)
1562
 
            # TODO: This might be a good time to turn items into StaticTuple
1563
 
            #       instances and possibly intern them. However, this does not
1564
 
            #       impact 'initial branch' performance, so I'm not worrying
1565
 
            #       about this yet
1566
 
            self._all_old_items.update(items)
1567
 
            # Queue up the uninteresting references
1568
 
            # Don't actually put them in the 'to-read' queue until we have
1569
 
            # finished checking the interesting references
1570
 
            old_chks_to_enqueue.extend(prefix_refs)
1571
 
        return old_chks_to_enqueue
1572
 
 
1573
 
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1574
 
        # At this point, we have read all the uninteresting and interesting
1575
 
        # items, so we can queue up the uninteresting stuff, knowing that we've
1576
 
        # handled the interesting ones
1577
 
        for prefix, ref in old_chks_to_enqueue:
1578
 
            not_interesting = True
1579
 
            for i in range(len(prefix), 0, -1):
1580
 
                if prefix[:i] in new_prefixes:
1581
 
                    not_interesting = False
1582
 
                    break
1583
 
            if not_interesting:
1584
 
                # This prefix is not part of the remaining 'interesting set'
1585
 
                continue
1586
 
            self._old_queue.append(ref)
1587
 
 
1588
 
    def _read_all_roots(self):
1589
 
        """Read the root pages.
1590
 
 
1591
 
        This is structured as a generator, so that the root records can be
1592
 
        yielded up to whoever needs them without any buffering.
1593
 
        """
1594
 
        # This is the bootstrap phase
1595
 
        if not self._old_root_keys:
1596
 
            # With no old_root_keys we can just shortcut and be ready
1597
 
            # for _flush_new_queue
1598
 
            self._new_queue = list(self._new_root_keys)
1599
 
            return
1600
 
        old_chks_to_enqueue = self._read_old_roots()
1601
 
        # filter out any root keys that are already known to be uninteresting
1602
 
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1603
 
        # These are prefixes that are present in new_keys that we are
1604
 
        # thinking to yield
1605
 
        new_prefixes = set()
1606
 
        # We are about to yield all of these, so we don't want them getting
1607
 
        # added a second time
1608
 
        processed_new_refs = self._processed_new_refs
1609
 
        processed_new_refs.update(new_keys)
1610
 
        for record, node, prefix_refs, items in \
1611
 
                self._read_nodes_from_store(new_keys):
1612
 
            # At this level, we now know all the uninteresting references
1613
 
            # So we filter and queue up whatever is remaining
1614
 
            prefix_refs = [p_r for p_r in prefix_refs
1615
 
                           if p_r[1] not in self._all_old_chks and
1616
 
                           p_r[1] not in processed_new_refs]
1617
 
            refs = [p_r[1] for p_r in prefix_refs]
1618
 
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
1619
 
            self._new_queue.extend(refs)
1620
 
            # TODO: We can potentially get multiple items here, however the
1621
 
            #       current design allows for this, as callers will do the work
1622
 
            #       to make the results unique. We might profile whether we
1623
 
            #       gain anything by ensuring unique return values for items
1624
 
            # TODO: This might be a good time to cast to StaticTuple, as
1625
 
            #       self._new_item_queue will hold the contents of multiple
1626
 
            #       records for an extended lifetime
1627
 
            new_items = [item for item in items
1628
 
                         if item not in self._all_old_items]
1629
 
            self._new_item_queue.extend(new_items)
1630
 
            new_prefixes.update([self._search_key_func(item[0])
1631
 
                                 for item in new_items])
1632
 
            processed_new_refs.update(refs)
1633
 
            yield record
1634
 
        # For new_prefixes we have the full length prefixes queued up.
1635
 
        # However, we also need possible prefixes. (If we have a known ref to
1636
 
        # 'ab', then we also need to include 'a'.) So expand the
1637
 
        # new_prefixes to include all shorter prefixes
1638
 
        for prefix in list(new_prefixes):
1639
 
            new_prefixes.update([prefix[:i] for i in range(1, len(prefix))])
1640
 
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1641
 
 
1642
 
    def _flush_new_queue(self):
1643
 
        # No need to maintain the heap invariant anymore, just pull things out
1644
 
        # and process them
1645
 
        refs = set(self._new_queue)
1646
 
        self._new_queue = []
1647
 
        # First pass, flush all interesting items and convert to using direct refs
1648
 
        all_old_chks = self._all_old_chks
1649
 
        processed_new_refs = self._processed_new_refs
1650
 
        all_old_items = self._all_old_items
1651
 
        new_items = [item for item in self._new_item_queue
1652
 
                     if item not in all_old_items]
1653
 
        self._new_item_queue = []
1654
 
        if new_items:
1655
 
            yield None, new_items
1656
 
        refs = refs.difference(all_old_chks)
1657
 
        processed_new_refs.update(refs)
1658
 
        while refs:
1659
 
            # TODO: Using a SimpleSet for self._processed_new_refs and
1660
 
            #       saved as much as 10MB of peak memory. However, it requires
1661
 
            #       implementing a non-pyrex version.
1662
 
            next_refs = set()
1663
 
            next_refs_update = next_refs.update
1664
 
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1665
 
            # from 1m54s to 1m51s. Consider it.
1666
 
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
1667
 
                if all_old_items:
1668
 
                    # using the 'if' check saves about 145s => 141s, when
1669
 
                    # streaming initial branch of Launchpad data.
1670
 
                    items = [item for item in items
1671
 
                             if item not in all_old_items]
1672
 
                yield record, items
1673
 
                next_refs_update([p_r[1] for p_r in p_refs])
1674
 
                del p_refs
1675
 
            # set1.difference(set/dict) walks all of set1, and checks if it
1676
 
            # exists in 'other'.
1677
 
            # set1.difference(iterable) walks all of iterable, and does a
1678
 
            # 'difference_update' on a clone of set1. Pick wisely based on the
1679
 
            # expected sizes of objects.
1680
 
            # in our case it is expected that 'new_refs' will always be quite
1681
 
            # small.
1682
 
            next_refs = next_refs.difference(all_old_chks)
1683
 
            next_refs = next_refs.difference(processed_new_refs)
1684
 
            processed_new_refs.update(next_refs)
1685
 
            refs = next_refs
1686
 
 
1687
 
    def _process_next_old(self):
1688
 
        # Since we don't filter uninteresting any further than during
1689
 
        # _read_all_roots, process the whole queue in a single pass.
1690
 
        refs = self._old_queue
1691
 
        self._old_queue = []
1692
 
        all_old_chks = self._all_old_chks
1693
 
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1694
 
            # TODO: Use StaticTuple here?
1695
 
            self._all_old_items.update(items)
1696
 
            refs = [r for _, r in prefix_refs if r not in all_old_chks]
1697
 
            self._old_queue.extend(refs)
1698
 
            all_old_chks.update(refs)
1699
 
 
1700
 
    def _process_queues(self):
1701
 
        while self._old_queue:
1702
 
            self._process_next_old()
1703
 
        return self._flush_new_queue()
1704
 
 
1705
 
    def process(self):
1706
 
        for record in self._read_all_roots():
1707
 
            yield record, []
1708
 
        for record, items in self._process_queues():
1709
 
            yield record, items
1710
 
 
1711
 
 
1712
 
def iter_interesting_nodes(store, interesting_root_keys,
1713
 
                           uninteresting_root_keys, pb=None):
1714
 
    """Given root keys, find interesting nodes.
1715
 
 
1716
 
    Evaluate nodes referenced by interesting_root_keys. Ones that are also
1717
 
    referenced from uninteresting_root_keys are not considered interesting.
1718
 
 
1719
 
    :param interesting_root_keys: keys which should be part of the
1720
 
        "interesting" nodes (which will be yielded)
1721
 
    :param uninteresting_root_keys: keys which should be filtered out of the
1722
 
        result set.
1723
 
    :return: Yield
1724
 
        (interesting record, {interesting key:values})
1725
 
    """
1726
 
    iterator = CHKMapDifference(store, interesting_root_keys,
1727
 
                                uninteresting_root_keys,
1728
 
                                search_key_func=store._search_key_func,
1729
 
                                pb=pb)
1730
 
    return iterator.process()
1731
 
 
1732
 
 
1733
 
try:
1734
 
    from ._chk_map_pyx import (
1735
 
        _bytes_to_text_key,
1736
 
        _search_key_16,
1737
 
        _search_key_255,
1738
 
        _deserialise_leaf_node,
1739
 
        _deserialise_internal_node,
1740
 
        )
1741
 
except ImportError as e:
1742
 
    osutils.failed_to_load_extension(e)
1743
 
    from ._chk_map_py import (
1744
 
        _bytes_to_text_key,
1745
 
        _search_key_16,
1746
 
        _search_key_255,
1747
 
        _deserialise_leaf_node,
1748
 
        _deserialise_internal_node,
1749
 
        )  # noqa: F401
1750
 
search_key_registry.register(b'hash-16-way', _search_key_16)
1751
 
search_key_registry.register(b'hash-255-way', _search_key_255)
1752
 
 
1753
 
 
1754
 
def _check_key(key):
1755
 
    """Helper function to assert that a key is properly formatted.
1756
 
 
1757
 
    This generally shouldn't be used in production code, but it can be helpful
1758
 
    to debug problems.
1759
 
    """
1760
 
    if not isinstance(key, StaticTuple):
1761
 
        raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1762
 
    if len(key) != 1:
1763
 
        raise ValueError('key %r should have length 1, not %d' %
1764
 
                         (key, len(key),))
1765
 
    if not isinstance(key[0], str):
1766
 
        raise TypeError('key %r should hold a str, not %r'
1767
 
                        % (key, type(key[0])))
1768
 
    if not key[0].startswith('sha1:'):
1769
 
        raise ValueError('key %r should point to a sha1:' % (key,))