/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/chk_map.py

  • Committer: Martin
  • Date: 2018-11-16 19:10:17 UTC
  • mto: This revision was merged to the branch mainline in revision 7177.
  • Revision ID: gzlist@googlemail.com-20181116191017-kyedz1qck0ovon3h
Remove lazy_regexp reset in bt.test_source

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
 
18
 
 
19
Overview and current status:
 
20
 
 
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
 
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
 
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
 
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
 
25
are all an additional 8-bits wide leading to a sparse upper tree.
 
26
 
 
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
 
28
allow optimisation of the update operation; but individual map/unmap calls are
 
29
possible and supported. Individual changes via map/unmap are buffered in memory
 
30
until the _save method is called to force serialisation of the tree.
 
31
apply_delta records its changes immediately by performing an implicit _save.
 
32
 
 
33
TODO:
 
34
-----
 
35
 
 
36
Densely packed upper nodes.
 
37
 
 
38
"""
 
39
 
 
40
from __future__ import absolute_import
 
41
 
 
42
import heapq
 
43
import threading
 
44
 
 
45
from .. import (
 
46
    errors,
 
47
    lru_cache,
 
48
    osutils,
 
49
    registry,
 
50
    static_tuple,
 
51
    trace,
 
52
    )
 
53
from ..sixish import (
 
54
    viewitems,
 
55
    viewvalues,
 
56
    )
 
57
from ..sixish import PY3
 
58
from ..static_tuple import StaticTuple
 
59
 
 
60
# approx 4MB
 
61
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
 
62
# out, it takes 3.1MB to cache the layer.
 
63
_PAGE_CACHE_SIZE = 4 * 1024 * 1024
 
64
# Per thread caches for 2 reasons:
 
65
# - in the server we may be serving very different content, so we get less
 
66
#   cache thrashing.
 
67
# - we avoid locking on every cache lookup.
 
68
_thread_caches = threading.local()
 
69
# The page cache.
 
70
_thread_caches.page_cache = None
 
71
 
 
72
 
 
73
def _get_cache():
 
74
    """Get the per-thread page cache.
 
75
 
 
76
    We need a function to do this because in a new thread the _thread_caches
 
77
    threading.local object does not have the cache initialized yet.
 
78
    """
 
79
    page_cache = getattr(_thread_caches, 'page_cache', None)
 
80
    if page_cache is None:
 
81
        # We are caching bytes so len(value) is perfectly accurate
 
82
        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
 
83
        _thread_caches.page_cache = page_cache
 
84
    return page_cache
 
85
 
 
86
 
 
87
def clear_cache():
 
88
    _get_cache().clear()
 
89
 
 
90
 
 
91
# If a ChildNode falls below this many bytes, we check for a remap
 
92
_INTERESTING_NEW_SIZE = 50
 
93
# If a ChildNode shrinks by more than this amount, we check for a remap
 
94
_INTERESTING_SHRINKAGE_LIMIT = 20
 
95
 
 
96
 
 
97
def _search_key_plain(key):
 
98
    """Map the key tuple into a search string that just uses the key bytes."""
 
99
    return b'\x00'.join(key)
 
100
 
 
101
 
 
102
search_key_registry = registry.Registry()
 
103
search_key_registry.register(b'plain', _search_key_plain)
 
104
 
 
105
 
 
106
class CHKMap(object):
 
107
    """A persistent map from string to string backed by a CHK store."""
 
108
 
 
109
    __slots__ = ('_store', '_root_node', '_search_key_func')
 
110
 
 
111
    def __init__(self, store, root_key, search_key_func=None):
 
112
        """Create a CHKMap object.
 
113
 
 
114
        :param store: The store the CHKMap is stored in.
 
115
        :param root_key: The root key of the map. None to create an empty
 
116
            CHKMap.
 
117
        :param search_key_func: A function mapping a key => bytes. These bytes
 
118
            are then used by the internal nodes to split up leaf nodes into
 
119
            multiple pages.
 
120
        """
 
121
        self._store = store
 
122
        if search_key_func is None:
 
123
            search_key_func = _search_key_plain
 
124
        self._search_key_func = search_key_func
 
125
        if root_key is None:
 
126
            self._root_node = LeafNode(search_key_func=search_key_func)
 
127
        else:
 
128
            self._root_node = self._node_key(root_key)
 
129
 
 
130
    def apply_delta(self, delta):
 
131
        """Apply a delta to the map.
 
132
 
 
133
        :param delta: An iterable of old_key, new_key, new_value tuples.
 
134
            If new_key is not None, then new_key->new_value is inserted
 
135
            into the map; if old_key is not None, then the old mapping
 
136
            of old_key is removed.
 
137
        """
 
138
        has_deletes = False
 
139
        # Check preconditions first.
 
140
        as_st = StaticTuple.from_sequence
 
141
        new_items = {as_st(key) for (old, key, value) in delta
 
142
                     if key is not None and old is None}
 
143
        existing_new = list(self.iteritems(key_filter=new_items))
 
144
        if existing_new:
 
145
            raise errors.InconsistentDeltaDelta(delta,
 
146
                                                "New items are already in the map %r." % existing_new)
 
147
        # Now apply changes.
 
148
        for old, new, value in delta:
 
149
            if old is not None and old != new:
 
150
                self.unmap(old, check_remap=False)
 
151
                has_deletes = True
 
152
        for old, new, value in delta:
 
153
            if new is not None:
 
154
                self.map(new, value)
 
155
        if has_deletes:
 
156
            self._check_remap()
 
157
        return self._save()
 
158
 
 
159
    def _ensure_root(self):
 
160
        """Ensure that the root node is an object not a key."""
 
161
        if isinstance(self._root_node, StaticTuple):
 
162
            # Demand-load the root
 
163
            self._root_node = self._get_node(self._root_node)
 
164
 
 
165
    def _get_node(self, node):
 
166
        """Get a node.
 
167
 
 
168
        Note that this does not update the _items dict in objects containing a
 
169
        reference to this node. As such it does not prevent subsequent IO being
 
170
        performed.
 
171
 
 
172
        :param node: A tuple key or node object.
 
173
        :return: A node object.
 
174
        """
 
175
        if isinstance(node, StaticTuple):
 
176
            bytes = self._read_bytes(node)
 
177
            return _deserialise(bytes, node,
 
178
                                search_key_func=self._search_key_func)
 
179
        else:
 
180
            return node
 
181
 
 
182
    def _read_bytes(self, key):
 
183
        try:
 
184
            return _get_cache()[key]
 
185
        except KeyError:
 
186
            stream = self._store.get_record_stream([key], 'unordered', True)
 
187
            bytes = next(stream).get_bytes_as('fulltext')
 
188
            _get_cache()[key] = bytes
 
189
            return bytes
 
190
 
 
191
    def _dump_tree(self, include_keys=False, encoding='utf-8'):
 
192
        """Return the tree in a string representation."""
 
193
        self._ensure_root()
 
194
        if PY3:
 
195
            def decode(x): return x.decode(encoding)
 
196
        else:
 
197
            def decode(x): return x
 
198
        res = self._dump_tree_node(self._root_node, prefix=b'', indent='',
 
199
                                   decode=decode, include_keys=include_keys)
 
200
        res.append('')  # Give a trailing '\n'
 
201
        return '\n'.join(res)
 
202
 
 
203
    def _dump_tree_node(self, node, prefix, indent, decode, include_keys=True):
 
204
        """For this node and all children, generate a string representation."""
 
205
        result = []
 
206
        if not include_keys:
 
207
            key_str = ''
 
208
        else:
 
209
            node_key = node.key()
 
210
            if node_key is not None:
 
211
                key_str = ' %s' % (decode(node_key[0]),)
 
212
            else:
 
213
                key_str = ' None'
 
214
        result.append('%s%r %s%s' % (indent, decode(prefix), node.__class__.__name__,
 
215
                                     key_str))
 
216
        if isinstance(node, InternalNode):
 
217
            # Trigger all child nodes to get loaded
 
218
            list(node._iter_nodes(self._store))
 
219
            for prefix, sub in sorted(viewitems(node._items)):
 
220
                result.extend(self._dump_tree_node(sub, prefix, indent + '  ',
 
221
                                                   decode=decode, include_keys=include_keys))
 
222
        else:
 
223
            for key, value in sorted(viewitems(node._items)):
 
224
                # Don't use prefix nor indent here to line up when used in
 
225
                # tests in conjunction with assertEqualDiff
 
226
                result.append('      %r %r' % (
 
227
                    tuple([decode(ke) for ke in key]), decode(value)))
 
228
        return result
 
229
 
 
230
    @classmethod
 
231
    def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
 
232
                  search_key_func=None):
 
233
        """Create a CHKMap in store with initial_value as the content.
 
234
 
 
235
        :param store: The store to record initial_value in, a VersionedFiles
 
236
            object with 1-tuple keys supporting CHK key generation.
 
237
        :param initial_value: A dict to store in store. Its keys and values
 
238
            must be bytestrings.
 
239
        :param maximum_size: The maximum_size rule to apply to nodes. This
 
240
            determines the size at which no new data is added to a single node.
 
241
        :param key_width: The number of elements in each key_tuple being stored
 
242
            in this map.
 
243
        :param search_key_func: A function mapping a key => bytes. These bytes
 
244
            are then used by the internal nodes to split up leaf nodes into
 
245
            multiple pages.
 
246
        :return: The root chk of the resulting CHKMap.
 
247
        """
 
248
        root_key = klass._create_directly(store, initial_value,
 
249
                                          maximum_size=maximum_size, key_width=key_width,
 
250
                                          search_key_func=search_key_func)
 
251
        if not isinstance(root_key, StaticTuple):
 
252
            raise AssertionError('we got a %s instead of a StaticTuple'
 
253
                                 % (type(root_key),))
 
254
        return root_key
 
255
 
 
256
    @classmethod
 
257
    def _create_via_map(klass, store, initial_value, maximum_size=0,
 
258
                        key_width=1, search_key_func=None):
 
259
        result = klass(store, None, search_key_func=search_key_func)
 
260
        result._root_node.set_maximum_size(maximum_size)
 
261
        result._root_node._key_width = key_width
 
262
        delta = []
 
263
        for key, value in viewitems(initial_value):
 
264
            delta.append((None, key, value))
 
265
        root_key = result.apply_delta(delta)
 
266
        return root_key
 
267
 
 
268
    @classmethod
 
269
    def _create_directly(klass, store, initial_value, maximum_size=0,
 
270
                         key_width=1, search_key_func=None):
 
271
        node = LeafNode(search_key_func=search_key_func)
 
272
        node.set_maximum_size(maximum_size)
 
273
        node._key_width = key_width
 
274
        as_st = StaticTuple.from_sequence
 
275
        node._items = dict((as_st(key), val)
 
276
                           for key, val in viewitems(initial_value))
 
277
        node._raw_size = sum(node._key_value_len(key, value)
 
278
                             for key, value in viewitems(node._items))
 
279
        node._len = len(node._items)
 
280
        node._compute_search_prefix()
 
281
        node._compute_serialised_prefix()
 
282
        if (node._len > 1 and
 
283
            maximum_size and
 
284
                node._current_size() > maximum_size):
 
285
            prefix, node_details = node._split(store)
 
286
            if len(node_details) == 1:
 
287
                raise AssertionError('Failed to split using node._split')
 
288
            node = InternalNode(prefix, search_key_func=search_key_func)
 
289
            node.set_maximum_size(maximum_size)
 
290
            node._key_width = key_width
 
291
            for split, subnode in node_details:
 
292
                node.add_node(split, subnode)
 
293
        keys = list(node.serialise(store))
 
294
        return keys[-1]
 
295
 
 
296
    def iter_changes(self, basis):
 
297
        """Iterate over the changes between basis and self.
 
298
 
 
299
        :return: An iterator of tuples: (key, old_value, new_value). Old_value
 
300
            is None for keys only in self; new_value is None for keys only in
 
301
            basis.
 
302
        """
 
303
        # Overview:
 
304
        # Read both trees in lexographic, highest-first order.
 
305
        # Any identical nodes we skip
 
306
        # Any unique prefixes we output immediately.
 
307
        # values in a leaf node are treated as single-value nodes in the tree
 
308
        # which allows them to be not-special-cased. We know to output them
 
309
        # because their value is a string, not a key(tuple) or node.
 
310
        #
 
311
        # corner cases to beware of when considering this function:
 
312
        # *) common references are at different heights.
 
313
        #    consider two trees:
 
314
        #    {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
 
315
        #    {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
 
316
        #                        'ab':LeafNode={'ab':'bar'}}
 
317
        #     'b': LeafNode={'b'}}
 
318
        #    the node with aaa/aab will only be encountered in the second tree
 
319
        #    after reading the 'a' subtree, but it is encountered in the first
 
320
        #    tree immediately. Variations on this may have read internal nodes
 
321
        #    like this.  we want to cut the entire pending subtree when we
 
322
        #    realise we have a common node.  For this we use a list of keys -
 
323
        #    the path to a node - and check the entire path is clean as we
 
324
        #    process each item.
 
325
        if self._node_key(self._root_node) == self._node_key(basis._root_node):
 
326
            return
 
327
        self._ensure_root()
 
328
        basis._ensure_root()
 
329
        excluded_keys = set()
 
330
        self_node = self._root_node
 
331
        basis_node = basis._root_node
 
332
        # A heap, each element is prefix, node(tuple/NodeObject/string),
 
333
        # key_path (a list of tuples, tail-sharing down the tree.)
 
334
        self_pending = []
 
335
        basis_pending = []
 
336
 
 
337
        def process_node(node, path, a_map, pending):
 
338
            # take a node and expand it
 
339
            node = a_map._get_node(node)
 
340
            if isinstance(node, LeafNode):
 
341
                path = (node._key, path)
 
342
                for key, value in viewitems(node._items):
 
343
                    # For a LeafNode, the key is a serialized_key, rather than
 
344
                    # a search_key, but the heap is using search_keys
 
345
                    search_key = node._search_key_func(key)
 
346
                    heapq.heappush(pending, (search_key, key, value, path))
 
347
            else:
 
348
                # type(node) == InternalNode
 
349
                path = (node._key, path)
 
350
                for prefix, child in viewitems(node._items):
 
351
                    heapq.heappush(pending, (prefix, None, child, path))
 
352
 
 
353
        def process_common_internal_nodes(self_node, basis_node):
 
354
            self_items = set(viewitems(self_node._items))
 
355
            basis_items = set(viewitems(basis_node._items))
 
356
            path = (self_node._key, None)
 
357
            for prefix, child in self_items - basis_items:
 
358
                heapq.heappush(self_pending, (prefix, None, child, path))
 
359
            path = (basis_node._key, None)
 
360
            for prefix, child in basis_items - self_items:
 
361
                heapq.heappush(basis_pending, (prefix, None, child, path))
 
362
 
 
363
        def process_common_leaf_nodes(self_node, basis_node):
 
364
            self_items = set(viewitems(self_node._items))
 
365
            basis_items = set(viewitems(basis_node._items))
 
366
            path = (self_node._key, None)
 
367
            for key, value in self_items - basis_items:
 
368
                prefix = self._search_key_func(key)
 
369
                heapq.heappush(self_pending, (prefix, key, value, path))
 
370
            path = (basis_node._key, None)
 
371
            for key, value in basis_items - self_items:
 
372
                prefix = basis._search_key_func(key)
 
373
                heapq.heappush(basis_pending, (prefix, key, value, path))
 
374
 
 
375
        def process_common_prefix_nodes(self_node, self_path,
 
376
                                        basis_node, basis_path):
 
377
            # Would it be more efficient if we could request both at the same
 
378
            # time?
 
379
            self_node = self._get_node(self_node)
 
380
            basis_node = basis._get_node(basis_node)
 
381
            if (isinstance(self_node, InternalNode) and
 
382
                    isinstance(basis_node, InternalNode)):
 
383
                # Matching internal nodes
 
384
                process_common_internal_nodes(self_node, basis_node)
 
385
            elif (isinstance(self_node, LeafNode) and
 
386
                  isinstance(basis_node, LeafNode)):
 
387
                process_common_leaf_nodes(self_node, basis_node)
 
388
            else:
 
389
                process_node(self_node, self_path, self, self_pending)
 
390
                process_node(basis_node, basis_path, basis, basis_pending)
 
391
        process_common_prefix_nodes(self_node, None, basis_node, None)
 
392
        self_seen = set()
 
393
        basis_seen = set()
 
394
        excluded_keys = set()
 
395
 
 
396
        def check_excluded(key_path):
 
397
            # Note that this is N^2, it depends on us trimming trees
 
398
            # aggressively to not become slow.
 
399
            # A better implementation would probably have a reverse map
 
400
            # back to the children of a node, and jump straight to it when
 
401
            # a common node is detected, the proceed to remove the already
 
402
            # pending children. breezy.graph has a searcher module with a
 
403
            # similar problem.
 
404
            while key_path is not None:
 
405
                key, key_path = key_path
 
406
                if key in excluded_keys:
 
407
                    return True
 
408
            return False
 
409
 
 
410
        loop_counter = 0
 
411
        while self_pending or basis_pending:
 
412
            loop_counter += 1
 
413
            if not self_pending:
 
414
                # self is exhausted: output remainder of basis
 
415
                for prefix, key, node, path in basis_pending:
 
416
                    if check_excluded(path):
 
417
                        continue
 
418
                    node = basis._get_node(node)
 
419
                    if key is not None:
 
420
                        # a value
 
421
                        yield (key, node, None)
 
422
                    else:
 
423
                        # subtree - fastpath the entire thing.
 
424
                        for key, value in node.iteritems(basis._store):
 
425
                            yield (key, value, None)
 
426
                return
 
427
            elif not basis_pending:
 
428
                # basis is exhausted: output remainder of self.
 
429
                for prefix, key, node, path in self_pending:
 
430
                    if check_excluded(path):
 
431
                        continue
 
432
                    node = self._get_node(node)
 
433
                    if key is not None:
 
434
                        # a value
 
435
                        yield (key, None, node)
 
436
                    else:
 
437
                        # subtree - fastpath the entire thing.
 
438
                        for key, value in node.iteritems(self._store):
 
439
                            yield (key, None, value)
 
440
                return
 
441
            else:
 
442
                # XXX: future optimisation - yield the smaller items
 
443
                # immediately rather than pushing everything on/off the
 
444
                # heaps. Applies to both internal nodes and leafnodes.
 
445
                if self_pending[0][0] < basis_pending[0][0]:
 
446
                    # expand self
 
447
                    prefix, key, node, path = heapq.heappop(self_pending)
 
448
                    if check_excluded(path):
 
449
                        continue
 
450
                    if key is not None:
 
451
                        # a value
 
452
                        yield (key, None, node)
 
453
                    else:
 
454
                        process_node(node, path, self, self_pending)
 
455
                        continue
 
456
                elif self_pending[0][0] > basis_pending[0][0]:
 
457
                    # expand basis
 
458
                    prefix, key, node, path = heapq.heappop(basis_pending)
 
459
                    if check_excluded(path):
 
460
                        continue
 
461
                    if key is not None:
 
462
                        # a value
 
463
                        yield (key, node, None)
 
464
                    else:
 
465
                        process_node(node, path, basis, basis_pending)
 
466
                        continue
 
467
                else:
 
468
                    # common prefix: possibly expand both
 
469
                    if self_pending[0][1] is None:
 
470
                        # process next self
 
471
                        read_self = True
 
472
                    else:
 
473
                        read_self = False
 
474
                    if basis_pending[0][1] is None:
 
475
                        # process next basis
 
476
                        read_basis = True
 
477
                    else:
 
478
                        read_basis = False
 
479
                    if not read_self and not read_basis:
 
480
                        # compare a common value
 
481
                        self_details = heapq.heappop(self_pending)
 
482
                        basis_details = heapq.heappop(basis_pending)
 
483
                        if self_details[2] != basis_details[2]:
 
484
                            yield (self_details[1],
 
485
                                   basis_details[2], self_details[2])
 
486
                        continue
 
487
                    # At least one side wasn't a simple value
 
488
                    if (self._node_key(self_pending[0][2])
 
489
                            == self._node_key(basis_pending[0][2])):
 
490
                        # Identical pointers, skip (and don't bother adding to
 
491
                        # excluded, it won't turn up again.
 
492
                        heapq.heappop(self_pending)
 
493
                        heapq.heappop(basis_pending)
 
494
                        continue
 
495
                    # Now we need to expand this node before we can continue
 
496
                    if read_self and read_basis:
 
497
                        # Both sides start with the same prefix, so process
 
498
                        # them in parallel
 
499
                        self_prefix, _, self_node, self_path = heapq.heappop(
 
500
                            self_pending)
 
501
                        basis_prefix, _, basis_node, basis_path = heapq.heappop(
 
502
                            basis_pending)
 
503
                        if self_prefix != basis_prefix:
 
504
                            raise AssertionError(
 
505
                                '%r != %r' % (self_prefix, basis_prefix))
 
506
                        process_common_prefix_nodes(
 
507
                            self_node, self_path,
 
508
                            basis_node, basis_path)
 
509
                        continue
 
510
                    if read_self:
 
511
                        prefix, key, node, path = heapq.heappop(self_pending)
 
512
                        if check_excluded(path):
 
513
                            continue
 
514
                        process_node(node, path, self, self_pending)
 
515
                    if read_basis:
 
516
                        prefix, key, node, path = heapq.heappop(basis_pending)
 
517
                        if check_excluded(path):
 
518
                            continue
 
519
                        process_node(node, path, basis, basis_pending)
 
520
        # print loop_counter
 
521
 
 
522
    def iteritems(self, key_filter=None):
 
523
        """Iterate over the entire CHKMap's contents."""
 
524
        self._ensure_root()
 
525
        if key_filter is not None:
 
526
            as_st = StaticTuple.from_sequence
 
527
            key_filter = [as_st(key) for key in key_filter]
 
528
        return self._root_node.iteritems(self._store, key_filter=key_filter)
 
529
 
 
530
    def key(self):
 
531
        """Return the key for this map."""
 
532
        if isinstance(self._root_node, StaticTuple):
 
533
            return self._root_node
 
534
        else:
 
535
            return self._root_node._key
 
536
 
 
537
    def __len__(self):
 
538
        self._ensure_root()
 
539
        return len(self._root_node)
 
540
 
 
541
    def map(self, key, value):
 
542
        """Map a key tuple to value.
 
543
 
 
544
        :param key: A key to map.
 
545
        :param value: The value to assign to key.
 
546
        """
 
547
        key = StaticTuple.from_sequence(key)
 
548
        # Need a root object.
 
549
        self._ensure_root()
 
550
        prefix, node_details = self._root_node.map(self._store, key, value)
 
551
        if len(node_details) == 1:
 
552
            self._root_node = node_details[0][1]
 
553
        else:
 
554
            self._root_node = InternalNode(prefix,
 
555
                                           search_key_func=self._search_key_func)
 
556
            self._root_node.set_maximum_size(node_details[0][1].maximum_size)
 
557
            self._root_node._key_width = node_details[0][1]._key_width
 
558
            for split, node in node_details:
 
559
                self._root_node.add_node(split, node)
 
560
 
 
561
    def _node_key(self, node):
 
562
        """Get the key for a node whether it's a tuple or node."""
 
563
        if isinstance(node, tuple):
 
564
            node = StaticTuple.from_sequence(node)
 
565
        if isinstance(node, StaticTuple):
 
566
            return node
 
567
        else:
 
568
            return node._key
 
569
 
 
570
    def unmap(self, key, check_remap=True):
 
571
        """remove key from the map."""
 
572
        key = StaticTuple.from_sequence(key)
 
573
        self._ensure_root()
 
574
        if isinstance(self._root_node, InternalNode):
 
575
            unmapped = self._root_node.unmap(self._store, key,
 
576
                                             check_remap=check_remap)
 
577
        else:
 
578
            unmapped = self._root_node.unmap(self._store, key)
 
579
        self._root_node = unmapped
 
580
 
 
581
    def _check_remap(self):
 
582
        """Check if nodes can be collapsed."""
 
583
        self._ensure_root()
 
584
        if isinstance(self._root_node, InternalNode):
 
585
            self._root_node = self._root_node._check_remap(self._store)
 
586
 
 
587
    def _save(self):
 
588
        """Save the map completely.
 
589
 
 
590
        :return: The key of the root node.
 
591
        """
 
592
        if isinstance(self._root_node, StaticTuple):
 
593
            # Already saved.
 
594
            return self._root_node
 
595
        keys = list(self._root_node.serialise(self._store))
 
596
        return keys[-1]
 
597
 
 
598
 
 
599
class Node(object):
 
600
    """Base class defining the protocol for CHK Map nodes.
 
601
 
 
602
    :ivar _raw_size: The total size of the serialized key:value data, before
 
603
        adding the header bytes, and without prefix compression.
 
604
    """
 
605
 
 
606
    __slots__ = ('_key', '_len', '_maximum_size', '_key_width',
 
607
                 '_raw_size', '_items', '_search_prefix', '_search_key_func'
 
608
                 )
 
609
 
 
610
    def __init__(self, key_width=1):
 
611
        """Create a node.
 
612
 
 
613
        :param key_width: The width of keys for this node.
 
614
        """
 
615
        self._key = None
 
616
        # Current number of elements
 
617
        self._len = 0
 
618
        self._maximum_size = 0
 
619
        self._key_width = key_width
 
620
        # current size in bytes
 
621
        self._raw_size = 0
 
622
        # The pointers/values this node has - meaning defined by child classes.
 
623
        self._items = {}
 
624
        # The common search prefix
 
625
        self._search_prefix = None
 
626
 
 
627
    def __repr__(self):
 
628
        items_str = str(sorted(self._items))
 
629
        if len(items_str) > 20:
 
630
            items_str = items_str[:16] + '...]'
 
631
        return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
 
632
            self.__class__.__name__, self._key, self._len, self._raw_size,
 
633
            self._maximum_size, self._search_prefix, items_str)
 
634
 
 
635
    def key(self):
 
636
        return self._key
 
637
 
 
638
    def __len__(self):
 
639
        return self._len
 
640
 
 
641
    @property
 
642
    def maximum_size(self):
 
643
        """What is the upper limit for adding references to a node."""
 
644
        return self._maximum_size
 
645
 
 
646
    def set_maximum_size(self, new_size):
 
647
        """Set the size threshold for nodes.
 
648
 
 
649
        :param new_size: The size at which no data is added to a node. 0 for
 
650
            unlimited.
 
651
        """
 
652
        self._maximum_size = new_size
 
653
 
 
654
    @classmethod
 
655
    def common_prefix(cls, prefix, key):
 
656
        """Given 2 strings, return the longest prefix common to both.
 
657
 
 
658
        :param prefix: This has been the common prefix for other keys, so it is
 
659
            more likely to be the common prefix in this case as well.
 
660
        :param key: Another string to compare to
 
661
        """
 
662
        if key.startswith(prefix):
 
663
            return prefix
 
664
        pos = -1
 
665
        # Is there a better way to do this?
 
666
        for pos, (left, right) in enumerate(zip(prefix, key)):
 
667
            if left != right:
 
668
                pos -= 1
 
669
                break
 
670
        common = prefix[:pos + 1]
 
671
        return common
 
672
 
 
673
    @classmethod
 
674
    def common_prefix_for_keys(cls, keys):
 
675
        """Given a list of keys, find their common prefix.
 
676
 
 
677
        :param keys: An iterable of strings.
 
678
        :return: The longest common prefix of all keys.
 
679
        """
 
680
        common_prefix = None
 
681
        for key in keys:
 
682
            if common_prefix is None:
 
683
                common_prefix = key
 
684
                continue
 
685
            common_prefix = cls.common_prefix(common_prefix, key)
 
686
            if not common_prefix:
 
687
                # if common_prefix is the empty string, then we know it won't
 
688
                # change further
 
689
                return b''
 
690
        return common_prefix
 
691
 
 
692
 
 
693
# Singleton indicating we have not computed _search_prefix yet
 
694
_unknown = object()
 
695
 
 
696
 
 
697
class LeafNode(Node):
 
698
    """A node containing actual key:value pairs.
 
699
 
 
700
    :ivar _items: A dict of key->value items. The key is in tuple form.
 
701
    :ivar _size: The number of bytes that would be used by serializing all of
 
702
        the key/value pairs.
 
703
    """
 
704
 
 
705
    __slots__ = ('_common_serialised_prefix',)
 
706
 
 
707
    def __init__(self, search_key_func=None):
 
708
        Node.__init__(self)
 
709
        # All of the keys in this leaf node share this common prefix
 
710
        self._common_serialised_prefix = None
 
711
        if search_key_func is None:
 
712
            self._search_key_func = _search_key_plain
 
713
        else:
 
714
            self._search_key_func = search_key_func
 
715
 
 
716
    def __repr__(self):
 
717
        items_str = str(sorted(self._items))
 
718
        if len(items_str) > 20:
 
719
            items_str = items_str[:16] + '...]'
 
720
        return \
 
721
            '%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
 
722
            % (self.__class__.__name__, self._key, self._len, self._raw_size,
 
723
               self._maximum_size, self._search_prefix, self._key_width, items_str)
 
724
 
 
725
    def _current_size(self):
 
726
        """Answer the current serialised size of this node.
 
727
 
 
728
        This differs from self._raw_size in that it includes the bytes used for
 
729
        the header.
 
730
        """
 
731
        if self._common_serialised_prefix is None:
 
732
            bytes_for_items = 0
 
733
            prefix_len = 0
 
734
        else:
 
735
            # We will store a single string with the common prefix
 
736
            # And then that common prefix will not be stored in any of the
 
737
            # entry lines
 
738
            prefix_len = len(self._common_serialised_prefix)
 
739
            bytes_for_items = (self._raw_size - (prefix_len * self._len))
 
740
        return (9 +  # 'chkleaf:\n' +
 
741
                len(str(self._maximum_size)) + 1 +
 
742
                len(str(self._key_width)) + 1 +
 
743
                len(str(self._len)) + 1 +
 
744
                prefix_len + 1 +
 
745
                bytes_for_items)
 
746
 
 
747
    @classmethod
 
748
    def deserialise(klass, bytes, key, search_key_func=None):
 
749
        """Deserialise bytes, with key key, into a LeafNode.
 
750
 
 
751
        :param bytes: The bytes of the node.
 
752
        :param key: The key that the serialised node has.
 
753
        """
 
754
        key = static_tuple.expect_static_tuple(key)
 
755
        return _deserialise_leaf_node(bytes, key,
 
756
                                      search_key_func=search_key_func)
 
757
 
 
758
    def iteritems(self, store, key_filter=None):
 
759
        """Iterate over items in the node.
 
760
 
 
761
        :param key_filter: A filter to apply to the node. It should be a
 
762
            list/set/dict or similar repeatedly iterable container.
 
763
        """
 
764
        if key_filter is not None:
 
765
            # Adjust the filter - short elements go to a prefix filter. All
 
766
            # other items are looked up directly.
 
767
            # XXX: perhaps defaultdict? Profiling<rinse and repeat>
 
768
            filters = {}
 
769
            for key in key_filter:
 
770
                if len(key) == self._key_width:
 
771
                    # This filter is meant to match exactly one key, yield it
 
772
                    # if we have it.
 
773
                    try:
 
774
                        yield key, self._items[key]
 
775
                    except KeyError:
 
776
                        # This key is not present in this map, continue
 
777
                        pass
 
778
                else:
 
779
                    # Short items, we need to match based on a prefix
 
780
                    filters.setdefault(len(key), set()).add(key)
 
781
            if filters:
 
782
                filters_itemview = viewitems(filters)
 
783
                for item in viewitems(self._items):
 
784
                    for length, length_filter in filters_itemview:
 
785
                        if item[0][:length] in length_filter:
 
786
                            yield item
 
787
                            break
 
788
        else:
 
789
            for item in viewitems(self._items):
 
790
                yield item
 
791
 
 
792
    def _key_value_len(self, key, value):
 
793
        # TODO: Should probably be done without actually joining the key, but
 
794
        #       then that can be done via the C extension
 
795
        return (len(self._serialise_key(key)) + 1 +
 
796
                len(b'%d' % value.count(b'\n')) + 1 +
 
797
                len(value) + 1)
 
798
 
 
799
    def _search_key(self, key):
 
800
        return self._search_key_func(key)
 
801
 
 
802
    def _map_no_split(self, key, value):
 
803
        """Map a key to a value.
 
804
 
 
805
        This assumes either the key does not already exist, or you have already
 
806
        removed its size and length from self.
 
807
 
 
808
        :return: True if adding this node should cause us to split.
 
809
        """
 
810
        self._items[key] = value
 
811
        self._raw_size += self._key_value_len(key, value)
 
812
        self._len += 1
 
813
        serialised_key = self._serialise_key(key)
 
814
        if self._common_serialised_prefix is None:
 
815
            self._common_serialised_prefix = serialised_key
 
816
        else:
 
817
            self._common_serialised_prefix = self.common_prefix(
 
818
                self._common_serialised_prefix, serialised_key)
 
819
        search_key = self._search_key(key)
 
820
        if self._search_prefix is _unknown:
 
821
            self._compute_search_prefix()
 
822
        if self._search_prefix is None:
 
823
            self._search_prefix = search_key
 
824
        else:
 
825
            self._search_prefix = self.common_prefix(
 
826
                self._search_prefix, search_key)
 
827
        if (self._len > 1 and
 
828
            self._maximum_size and
 
829
                self._current_size() > self._maximum_size):
 
830
            # Check to see if all of the search_keys for this node are
 
831
            # identical. We allow the node to grow under that circumstance
 
832
            # (we could track this as common state, but it is infrequent)
 
833
            if (search_key != self._search_prefix or
 
834
                    not self._are_search_keys_identical()):
 
835
                return True
 
836
        return False
 
837
 
 
838
    def _split(self, store):
 
839
        """We have overflowed.
 
840
 
 
841
        Split this node into multiple LeafNodes, return it up the stack so that
 
842
        the next layer creates a new InternalNode and references the new nodes.
 
843
 
 
844
        :return: (common_serialised_prefix, [(node_serialised_prefix, node)])
 
845
        """
 
846
        if self._search_prefix is _unknown:
 
847
            raise AssertionError('Search prefix must be known')
 
848
        common_prefix = self._search_prefix
 
849
        split_at = len(common_prefix) + 1
 
850
        result = {}
 
851
        for key, value in viewitems(self._items):
 
852
            search_key = self._search_key(key)
 
853
            prefix = search_key[:split_at]
 
854
            # TODO: Generally only 1 key can be exactly the right length,
 
855
            #       which means we can only have 1 key in the node pointed
 
856
            #       at by the 'prefix\0' key. We might want to consider
 
857
            #       folding it into the containing InternalNode rather than
 
858
            #       having a fixed length-1 node.
 
859
            #       Note this is probably not true for hash keys, as they
 
860
            #       may get a '\00' node anywhere, but won't have keys of
 
861
            #       different lengths.
 
862
            if len(prefix) < split_at:
 
863
                prefix += b'\x00' * (split_at - len(prefix))
 
864
            if prefix not in result:
 
865
                node = LeafNode(search_key_func=self._search_key_func)
 
866
                node.set_maximum_size(self._maximum_size)
 
867
                node._key_width = self._key_width
 
868
                result[prefix] = node
 
869
            else:
 
870
                node = result[prefix]
 
871
            sub_prefix, node_details = node.map(store, key, value)
 
872
            if len(node_details) > 1:
 
873
                if prefix != sub_prefix:
 
874
                    # This node has been split and is now found via a different
 
875
                    # path
 
876
                    result.pop(prefix)
 
877
                new_node = InternalNode(sub_prefix,
 
878
                                        search_key_func=self._search_key_func)
 
879
                new_node.set_maximum_size(self._maximum_size)
 
880
                new_node._key_width = self._key_width
 
881
                for split, node in node_details:
 
882
                    new_node.add_node(split, node)
 
883
                result[prefix] = new_node
 
884
        return common_prefix, list(viewitems(result))
 
885
 
 
886
    def map(self, store, key, value):
 
887
        """Map key to value."""
 
888
        if key in self._items:
 
889
            self._raw_size -= self._key_value_len(key, self._items[key])
 
890
            self._len -= 1
 
891
        self._key = None
 
892
        if self._map_no_split(key, value):
 
893
            return self._split(store)
 
894
        else:
 
895
            if self._search_prefix is _unknown:
 
896
                raise AssertionError('%r must be known' % self._search_prefix)
 
897
            return self._search_prefix, [(b"", self)]
 
898
 
 
899
    _serialise_key = b'\x00'.join
 
900
 
 
901
    def serialise(self, store):
 
902
        """Serialise the LeafNode to store.
 
903
 
 
904
        :param store: A VersionedFiles honouring the CHK extensions.
 
905
        :return: An iterable of the keys inserted by this operation.
 
906
        """
 
907
        lines = [b"chkleaf:\n"]
 
908
        lines.append(b"%d\n" % self._maximum_size)
 
909
        lines.append(b"%d\n" % self._key_width)
 
910
        lines.append(b"%d\n" % self._len)
 
911
        if self._common_serialised_prefix is None:
 
912
            lines.append(b'\n')
 
913
            if len(self._items) != 0:
 
914
                raise AssertionError('If _common_serialised_prefix is None'
 
915
                                     ' we should have no items')
 
916
        else:
 
917
            lines.append(b'%s\n' % (self._common_serialised_prefix,))
 
918
            prefix_len = len(self._common_serialised_prefix)
 
919
        for key, value in sorted(viewitems(self._items)):
 
920
            # Always add a final newline
 
921
            value_lines = osutils.chunks_to_lines([value + b'\n'])
 
922
            serialized = b"%s\x00%d\n" % (self._serialise_key(key),
 
923
                                          len(value_lines))
 
924
            if not serialized.startswith(self._common_serialised_prefix):
 
925
                raise AssertionError('We thought the common prefix was %r'
 
926
                                     ' but entry %r does not have it in common'
 
927
                                     % (self._common_serialised_prefix, serialized))
 
928
            lines.append(serialized[prefix_len:])
 
929
            lines.extend(value_lines)
 
930
        sha1, _, _ = store.add_lines((None,), (), lines)
 
931
        self._key = StaticTuple(b"sha1:" + sha1,).intern()
 
932
        data = b''.join(lines)
 
933
        if len(data) != self._current_size():
 
934
            raise AssertionError('Invalid _current_size')
 
935
        _get_cache()[self._key] = data
 
936
        return [self._key]
 
937
 
 
938
    def refs(self):
 
939
        """Return the references to other CHK's held by this node."""
 
940
        return []
 
941
 
 
942
    def _compute_search_prefix(self):
 
943
        """Determine the common search prefix for all keys in this node.
 
944
 
 
945
        :return: A bytestring of the longest search key prefix that is
 
946
            unique within this node.
 
947
        """
 
948
        search_keys = [self._search_key_func(key) for key in self._items]
 
949
        self._search_prefix = self.common_prefix_for_keys(search_keys)
 
950
        return self._search_prefix
 
951
 
 
952
    def _are_search_keys_identical(self):
 
953
        """Check to see if the search keys for all entries are the same.
 
954
 
 
955
        When using a hash as the search_key it is possible for non-identical
 
956
        keys to collide. If that happens enough, we may try overflow a
 
957
        LeafNode, but as all are collisions, we must not split.
 
958
        """
 
959
        common_search_key = None
 
960
        for key in self._items:
 
961
            search_key = self._search_key(key)
 
962
            if common_search_key is None:
 
963
                common_search_key = search_key
 
964
            elif search_key != common_search_key:
 
965
                return False
 
966
        return True
 
967
 
 
968
    def _compute_serialised_prefix(self):
 
969
        """Determine the common prefix for serialised keys in this node.
 
970
 
 
971
        :return: A bytestring of the longest serialised key prefix that is
 
972
            unique within this node.
 
973
        """
 
974
        serialised_keys = [self._serialise_key(key) for key in self._items]
 
975
        self._common_serialised_prefix = self.common_prefix_for_keys(
 
976
            serialised_keys)
 
977
        return self._common_serialised_prefix
 
978
 
 
979
    def unmap(self, store, key):
 
980
        """Unmap key from the node."""
 
981
        try:
 
982
            self._raw_size -= self._key_value_len(key, self._items[key])
 
983
        except KeyError:
 
984
            trace.mutter("key %s not found in %r", key, self._items)
 
985
            raise
 
986
        self._len -= 1
 
987
        del self._items[key]
 
988
        self._key = None
 
989
        # Recompute from scratch
 
990
        self._compute_search_prefix()
 
991
        self._compute_serialised_prefix()
 
992
        return self
 
993
 
 
994
 
 
995
class InternalNode(Node):
 
996
    """A node that contains references to other nodes.
 
997
 
 
998
    An InternalNode is responsible for mapping search key prefixes to child
 
999
    nodes.
 
1000
 
 
1001
    :ivar _items: serialised_key => node dictionary. node may be a tuple,
 
1002
        LeafNode or InternalNode.
 
1003
    """
 
1004
 
 
1005
    __slots__ = ('_node_width',)
 
1006
 
 
1007
    def __init__(self, prefix=b'', search_key_func=None):
 
1008
        Node.__init__(self)
 
1009
        # The size of an internalnode with default values and no children.
 
1010
        # How many octets key prefixes within this node are.
 
1011
        self._node_width = 0
 
1012
        self._search_prefix = prefix
 
1013
        if search_key_func is None:
 
1014
            self._search_key_func = _search_key_plain
 
1015
        else:
 
1016
            self._search_key_func = search_key_func
 
1017
 
 
1018
    def add_node(self, prefix, node):
 
1019
        """Add a child node with prefix prefix, and node node.
 
1020
 
 
1021
        :param prefix: The search key prefix for node.
 
1022
        :param node: The node being added.
 
1023
        """
 
1024
        if self._search_prefix is None:
 
1025
            raise AssertionError("_search_prefix should not be None")
 
1026
        if not prefix.startswith(self._search_prefix):
 
1027
            raise AssertionError("prefixes mismatch: %s must start with %s"
 
1028
                                 % (prefix, self._search_prefix))
 
1029
        if len(prefix) != len(self._search_prefix) + 1:
 
1030
            raise AssertionError("prefix wrong length: len(%s) is not %d" %
 
1031
                                 (prefix, len(self._search_prefix) + 1))
 
1032
        self._len += len(node)
 
1033
        if not len(self._items):
 
1034
            self._node_width = len(prefix)
 
1035
        if self._node_width != len(self._search_prefix) + 1:
 
1036
            raise AssertionError("node width mismatch: %d is not %d" %
 
1037
                                 (self._node_width, len(self._search_prefix) + 1))
 
1038
        self._items[prefix] = node
 
1039
        self._key = None
 
1040
 
 
1041
    def _current_size(self):
 
1042
        """Answer the current serialised size of this node."""
 
1043
        return (self._raw_size + len(str(self._len)) + len(str(self._key_width))
 
1044
                + len(str(self._maximum_size)))
 
1045
 
 
1046
    @classmethod
 
1047
    def deserialise(klass, bytes, key, search_key_func=None):
 
1048
        """Deserialise bytes to an InternalNode, with key key.
 
1049
 
 
1050
        :param bytes: The bytes of the node.
 
1051
        :param key: The key that the serialised node has.
 
1052
        :return: An InternalNode instance.
 
1053
        """
 
1054
        key = static_tuple.expect_static_tuple(key)
 
1055
        return _deserialise_internal_node(bytes, key,
 
1056
                                          search_key_func=search_key_func)
 
1057
 
 
1058
    def iteritems(self, store, key_filter=None):
 
1059
        for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
 
1060
            for item in node.iteritems(store, key_filter=node_filter):
 
1061
                yield item
 
1062
 
 
1063
    def _iter_nodes(self, store, key_filter=None, batch_size=None):
 
1064
        """Iterate over node objects which match key_filter.
 
1065
 
 
1066
        :param store: A store to use for accessing content.
 
1067
        :param key_filter: A key filter to filter nodes. Only nodes that might
 
1068
            contain a key in key_filter will be returned.
 
1069
        :param batch_size: If not None, then we will return the nodes that had
 
1070
            to be read using get_record_stream in batches, rather than reading
 
1071
            them all at once.
 
1072
        :return: An iterable of nodes. This function does not have to be fully
 
1073
            consumed.  (There will be no pending I/O when items are being returned.)
 
1074
        """
 
1075
        # Map from chk key ('sha1:...',) to (prefix, key_filter)
 
1076
        # prefix is the key in self._items to use, key_filter is the key_filter
 
1077
        # entries that would match this node
 
1078
        keys = {}
 
1079
        shortcut = False
 
1080
        if key_filter is None:
 
1081
            # yielding all nodes, yield whatever we have, and queue up a read
 
1082
            # for whatever we are missing
 
1083
            shortcut = True
 
1084
            for prefix, node in viewitems(self._items):
 
1085
                if node.__class__ is StaticTuple:
 
1086
                    keys[node] = (prefix, None)
 
1087
                else:
 
1088
                    yield node, None
 
1089
        elif len(key_filter) == 1:
 
1090
            # Technically, this path could also be handled by the first check
 
1091
            # in 'self._node_width' in length_filters. However, we can handle
 
1092
            # this case without spending any time building up the
 
1093
            # prefix_to_keys, etc state.
 
1094
 
 
1095
            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
 
1096
            # 0.626us   list(key_filter)[0]
 
1097
            #       is a func() for list(), 2 mallocs, and a getitem
 
1098
            # 0.489us   [k for k in key_filter][0]
 
1099
            #       still has the mallocs, avoids the func() call
 
1100
            # 0.350us   iter(key_filter).next()
 
1101
            #       has a func() call, and mallocs an iterator
 
1102
            # 0.125us   for key in key_filter: pass
 
1103
            #       no func() overhead, might malloc an iterator
 
1104
            # 0.105us   for key in key_filter: break
 
1105
            #       no func() overhead, might malloc an iterator, probably
 
1106
            #       avoids checking an 'else' clause as part of the for
 
1107
            for key in key_filter:
 
1108
                break
 
1109
            search_prefix = self._search_prefix_filter(key)
 
1110
            if len(search_prefix) == self._node_width:
 
1111
                # This item will match exactly, so just do a dict lookup, and
 
1112
                # see what we can return
 
1113
                shortcut = True
 
1114
                try:
 
1115
                    node = self._items[search_prefix]
 
1116
                except KeyError:
 
1117
                    # A given key can only match 1 child node, if it isn't
 
1118
                    # there, then we can just return nothing
 
1119
                    return
 
1120
                if node.__class__ is StaticTuple:
 
1121
                    keys[node] = (search_prefix, [key])
 
1122
                else:
 
1123
                    # This is loaded, and the only thing that can match,
 
1124
                    # return
 
1125
                    yield node, [key]
 
1126
                    return
 
1127
        if not shortcut:
 
1128
            # First, convert all keys into a list of search prefixes
 
1129
            # Aggregate common prefixes, and track the keys they come from
 
1130
            prefix_to_keys = {}
 
1131
            length_filters = {}
 
1132
            for key in key_filter:
 
1133
                search_prefix = self._search_prefix_filter(key)
 
1134
                length_filter = length_filters.setdefault(
 
1135
                    len(search_prefix), set())
 
1136
                length_filter.add(search_prefix)
 
1137
                prefix_to_keys.setdefault(search_prefix, []).append(key)
 
1138
 
 
1139
            if (self._node_width in length_filters and
 
1140
                    len(length_filters) == 1):
 
1141
                # all of the search prefixes match exactly _node_width. This
 
1142
                # means that everything is an exact match, and we can do a
 
1143
                # lookup into self._items, rather than iterating over the items
 
1144
                # dict.
 
1145
                search_prefixes = length_filters[self._node_width]
 
1146
                for search_prefix in search_prefixes:
 
1147
                    try:
 
1148
                        node = self._items[search_prefix]
 
1149
                    except KeyError:
 
1150
                        # We can ignore this one
 
1151
                        continue
 
1152
                    node_key_filter = prefix_to_keys[search_prefix]
 
1153
                    if node.__class__ is StaticTuple:
 
1154
                        keys[node] = (search_prefix, node_key_filter)
 
1155
                    else:
 
1156
                        yield node, node_key_filter
 
1157
            else:
 
1158
                # The slow way. We walk every item in self._items, and check to
 
1159
                # see if there are any matches
 
1160
                length_filters_itemview = viewitems(length_filters)
 
1161
                for prefix, node in viewitems(self._items):
 
1162
                    node_key_filter = []
 
1163
                    for length, length_filter in length_filters_itemview:
 
1164
                        sub_prefix = prefix[:length]
 
1165
                        if sub_prefix in length_filter:
 
1166
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
 
1167
                    if node_key_filter:  # this key matched something, yield it
 
1168
                        if node.__class__ is StaticTuple:
 
1169
                            keys[node] = (prefix, node_key_filter)
 
1170
                        else:
 
1171
                            yield node, node_key_filter
 
1172
        if keys:
 
1173
            # Look in the page cache for some more bytes
 
1174
            found_keys = set()
 
1175
            for key in keys:
 
1176
                try:
 
1177
                    bytes = _get_cache()[key]
 
1178
                except KeyError:
 
1179
                    continue
 
1180
                else:
 
1181
                    node = _deserialise(bytes, key,
 
1182
                                        search_key_func=self._search_key_func)
 
1183
                    prefix, node_key_filter = keys[key]
 
1184
                    self._items[prefix] = node
 
1185
                    found_keys.add(key)
 
1186
                    yield node, node_key_filter
 
1187
            for key in found_keys:
 
1188
                del keys[key]
 
1189
        if keys:
 
1190
            # demand load some pages.
 
1191
            if batch_size is None:
 
1192
                # Read all the keys in
 
1193
                batch_size = len(keys)
 
1194
            key_order = list(keys)
 
1195
            for batch_start in range(0, len(key_order), batch_size):
 
1196
                batch = key_order[batch_start:batch_start + batch_size]
 
1197
                # We have to fully consume the stream so there is no pending
 
1198
                # I/O, so we buffer the nodes for now.
 
1199
                stream = store.get_record_stream(batch, 'unordered', True)
 
1200
                node_and_filters = []
 
1201
                for record in stream:
 
1202
                    bytes = record.get_bytes_as('fulltext')
 
1203
                    node = _deserialise(bytes, record.key,
 
1204
                                        search_key_func=self._search_key_func)
 
1205
                    prefix, node_key_filter = keys[record.key]
 
1206
                    node_and_filters.append((node, node_key_filter))
 
1207
                    self._items[prefix] = node
 
1208
                    _get_cache()[record.key] = bytes
 
1209
                for info in node_and_filters:
 
1210
                    yield info
 
1211
 
 
1212
    def map(self, store, key, value):
 
1213
        """Map key to value."""
 
1214
        if not len(self._items):
 
1215
            raise AssertionError("can't map in an empty InternalNode.")
 
1216
        search_key = self._search_key(key)
 
1217
        if self._node_width != len(self._search_prefix) + 1:
 
1218
            raise AssertionError("node width mismatch: %d is not %d" %
 
1219
                                 (self._node_width, len(self._search_prefix) + 1))
 
1220
        if not search_key.startswith(self._search_prefix):
 
1221
            # This key doesn't fit in this index, so we need to split at the
 
1222
            # point where it would fit, insert self into that internal node,
 
1223
            # and then map this key into that node.
 
1224
            new_prefix = self.common_prefix(self._search_prefix,
 
1225
                                            search_key)
 
1226
            new_parent = InternalNode(new_prefix,
 
1227
                                      search_key_func=self._search_key_func)
 
1228
            new_parent.set_maximum_size(self._maximum_size)
 
1229
            new_parent._key_width = self._key_width
 
1230
            new_parent.add_node(self._search_prefix[:len(new_prefix) + 1],
 
1231
                                self)
 
1232
            return new_parent.map(store, key, value)
 
1233
        children = [node for node, _ in self._iter_nodes(
 
1234
            store, key_filter=[key])]
 
1235
        if children:
 
1236
            child = children[0]
 
1237
        else:
 
1238
            # new child needed:
 
1239
            child = self._new_child(search_key, LeafNode)
 
1240
        old_len = len(child)
 
1241
        if isinstance(child, LeafNode):
 
1242
            old_size = child._current_size()
 
1243
        else:
 
1244
            old_size = None
 
1245
        prefix, node_details = child.map(store, key, value)
 
1246
        if len(node_details) == 1:
 
1247
            # child may have shrunk, or might be a new node
 
1248
            child = node_details[0][1]
 
1249
            self._len = self._len - old_len + len(child)
 
1250
            self._items[search_key] = child
 
1251
            self._key = None
 
1252
            new_node = self
 
1253
            if isinstance(child, LeafNode):
 
1254
                if old_size is None:
 
1255
                    # The old node was an InternalNode which means it has now
 
1256
                    # collapsed, so we need to check if it will chain to a
 
1257
                    # collapse at this level.
 
1258
                    trace.mutter("checking remap as InternalNode -> LeafNode")
 
1259
                    new_node = self._check_remap(store)
 
1260
                else:
 
1261
                    # If the LeafNode has shrunk in size, we may want to run
 
1262
                    # a remap check. Checking for a remap is expensive though
 
1263
                    # and the frequency of a successful remap is very low.
 
1264
                    # Shrinkage by small amounts is common, so we only do the
 
1265
                    # remap check if the new_size is low or the shrinkage
 
1266
                    # amount is over a configurable limit.
 
1267
                    new_size = child._current_size()
 
1268
                    shrinkage = old_size - new_size
 
1269
                    if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE or
 
1270
                            shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
 
1271
                        trace.mutter(
 
1272
                            "checking remap as size shrunk by %d to be %d",
 
1273
                            shrinkage, new_size)
 
1274
                        new_node = self._check_remap(store)
 
1275
            if new_node._search_prefix is None:
 
1276
                raise AssertionError("_search_prefix should not be None")
 
1277
            return new_node._search_prefix, [(b'', new_node)]
 
1278
        # child has overflown - create a new intermediate node.
 
1279
        # XXX: This is where we might want to try and expand our depth
 
1280
        # to refer to more bytes of every child (which would give us
 
1281
        # multiple pointers to child nodes, but less intermediate nodes)
 
1282
        child = self._new_child(search_key, InternalNode)
 
1283
        child._search_prefix = prefix
 
1284
        for split, node in node_details:
 
1285
            child.add_node(split, node)
 
1286
        self._len = self._len - old_len + len(child)
 
1287
        self._key = None
 
1288
        return self._search_prefix, [(b"", self)]
 
1289
 
 
1290
    def _new_child(self, search_key, klass):
 
1291
        """Create a new child node of type klass."""
 
1292
        child = klass()
 
1293
        child.set_maximum_size(self._maximum_size)
 
1294
        child._key_width = self._key_width
 
1295
        child._search_key_func = self._search_key_func
 
1296
        self._items[search_key] = child
 
1297
        return child
 
1298
 
 
1299
    def serialise(self, store):
 
1300
        """Serialise the node to store.
 
1301
 
 
1302
        :param store: A VersionedFiles honouring the CHK extensions.
 
1303
        :return: An iterable of the keys inserted by this operation.
 
1304
        """
 
1305
        for node in viewvalues(self._items):
 
1306
            if isinstance(node, StaticTuple):
 
1307
                # Never deserialised.
 
1308
                continue
 
1309
            if node._key is not None:
 
1310
                # Never altered
 
1311
                continue
 
1312
            for key in node.serialise(store):
 
1313
                yield key
 
1314
        lines = [b"chknode:\n"]
 
1315
        lines.append(b"%d\n" % self._maximum_size)
 
1316
        lines.append(b"%d\n" % self._key_width)
 
1317
        lines.append(b"%d\n" % self._len)
 
1318
        if self._search_prefix is None:
 
1319
            raise AssertionError("_search_prefix should not be None")
 
1320
        lines.append(b'%s\n' % (self._search_prefix,))
 
1321
        prefix_len = len(self._search_prefix)
 
1322
        for prefix, node in sorted(viewitems(self._items)):
 
1323
            if isinstance(node, StaticTuple):
 
1324
                key = node[0]
 
1325
            else:
 
1326
                key = node._key[0]
 
1327
            serialised = b"%s\x00%s\n" % (prefix, key)
 
1328
            if not serialised.startswith(self._search_prefix):
 
1329
                raise AssertionError("prefixes mismatch: %s must start with %s"
 
1330
                                     % (serialised, self._search_prefix))
 
1331
            lines.append(serialised[prefix_len:])
 
1332
        sha1, _, _ = store.add_lines((None,), (), lines)
 
1333
        self._key = StaticTuple(b"sha1:" + sha1,).intern()
 
1334
        _get_cache()[self._key] = b''.join(lines)
 
1335
        yield self._key
 
1336
 
 
1337
    def _search_key(self, key):
 
1338
        """Return the serialised key for key in this node."""
 
1339
        # search keys are fixed width. All will be self._node_width wide, so we
 
1340
        # pad as necessary.
 
1341
        return (self._search_key_func(key) + b'\x00' * self._node_width)[:self._node_width]
 
1342
 
 
1343
    def _search_prefix_filter(self, key):
 
1344
        """Serialise key for use as a prefix filter in iteritems."""
 
1345
        return self._search_key_func(key)[:self._node_width]
 
1346
 
 
1347
    def _split(self, offset):
 
1348
        """Split this node into smaller nodes starting at offset.
 
1349
 
 
1350
        :param offset: The offset to start the new child nodes at.
 
1351
        :return: An iterable of (prefix, node) tuples. prefix is a byte
 
1352
            prefix for reaching node.
 
1353
        """
 
1354
        if offset >= self._node_width:
 
1355
            for node in valueview(self._items):
 
1356
                for result in node._split(offset):
 
1357
                    yield result
 
1358
 
 
1359
    def refs(self):
 
1360
        """Return the references to other CHK's held by this node."""
 
1361
        if self._key is None:
 
1362
            raise AssertionError("unserialised nodes have no refs.")
 
1363
        refs = []
 
1364
        for value in viewvalues(self._items):
 
1365
            if isinstance(value, StaticTuple):
 
1366
                refs.append(value)
 
1367
            else:
 
1368
                refs.append(value.key())
 
1369
        return refs
 
1370
 
 
1371
    def _compute_search_prefix(self, extra_key=None):
 
1372
        """Return the unique key prefix for this node.
 
1373
 
 
1374
        :return: A bytestring of the longest search key prefix that is
 
1375
            unique within this node.
 
1376
        """
 
1377
        self._search_prefix = self.common_prefix_for_keys(self._items)
 
1378
        return self._search_prefix
 
1379
 
 
1380
    def unmap(self, store, key, check_remap=True):
 
1381
        """Remove key from this node and its children."""
 
1382
        if not len(self._items):
 
1383
            raise AssertionError("can't unmap in an empty InternalNode.")
 
1384
        children = [node for node, _
 
1385
                    in self._iter_nodes(store, key_filter=[key])]
 
1386
        if children:
 
1387
            child = children[0]
 
1388
        else:
 
1389
            raise KeyError(key)
 
1390
        self._len -= 1
 
1391
        unmapped = child.unmap(store, key)
 
1392
        self._key = None
 
1393
        search_key = self._search_key(key)
 
1394
        if len(unmapped) == 0:
 
1395
            # All child nodes are gone, remove the child:
 
1396
            del self._items[search_key]
 
1397
            unmapped = None
 
1398
        else:
 
1399
            # Stash the returned node
 
1400
            self._items[search_key] = unmapped
 
1401
        if len(self._items) == 1:
 
1402
            # this node is no longer needed:
 
1403
            return list(viewvalues(self._items))[0]
 
1404
        if isinstance(unmapped, InternalNode):
 
1405
            return self
 
1406
        if check_remap:
 
1407
            return self._check_remap(store)
 
1408
        else:
 
1409
            return self
 
1410
 
 
1411
    def _check_remap(self, store):
 
1412
        """Check if all keys contained by children fit in a single LeafNode.
 
1413
 
 
1414
        :param store: A store to use for reading more nodes
 
1415
        :return: Either self, or a new LeafNode which should replace self.
 
1416
        """
 
1417
        # Logic for how we determine when we need to rebuild
 
1418
        # 1) Implicitly unmap() is removing a key which means that the child
 
1419
        #    nodes are going to be shrinking by some extent.
 
1420
        # 2) If all children are LeafNodes, it is possible that they could be
 
1421
        #    combined into a single LeafNode, which can then completely replace
 
1422
        #    this internal node with a single LeafNode
 
1423
        # 3) If *one* child is an InternalNode, we assume it has already done
 
1424
        #    all the work to determine that its children cannot collapse, and
 
1425
        #    we can then assume that those nodes *plus* the current nodes don't
 
1426
        #    have a chance of collapsing either.
 
1427
        #    So a very cheap check is to just say if 'unmapped' is an
 
1428
        #    InternalNode, we don't have to check further.
 
1429
 
 
1430
        # TODO: Another alternative is to check the total size of all known
 
1431
        #       LeafNodes. If there is some formula we can use to determine the
 
1432
        #       final size without actually having to read in any more
 
1433
        #       children, it would be nice to have. However, we have to be
 
1434
        #       careful with stuff like nodes that pull out the common prefix
 
1435
        #       of each key, as adding a new key can change the common prefix
 
1436
        #       and cause size changes greater than the length of one key.
 
1437
        #       So for now, we just add everything to a new Leaf until it
 
1438
        #       splits, as we know that will give the right answer
 
1439
        new_leaf = LeafNode(search_key_func=self._search_key_func)
 
1440
        new_leaf.set_maximum_size(self._maximum_size)
 
1441
        new_leaf._key_width = self._key_width
 
1442
        # A batch_size of 16 was chosen because:
 
1443
        #   a) In testing, a 4k page held 14 times. So if we have more than 16
 
1444
        #      leaf nodes we are unlikely to hold them in a single new leaf
 
1445
        #      node. This still allows for 1 round trip
 
1446
        #   b) With 16-way fan out, we can still do a single round trip
 
1447
        #   c) With 255-way fan out, we don't want to read all 255 and destroy
 
1448
        #      the page cache, just to determine that we really don't need it.
 
1449
        for node, _ in self._iter_nodes(store, batch_size=16):
 
1450
            if isinstance(node, InternalNode):
 
1451
                # Without looking at any leaf nodes, we are sure
 
1452
                return self
 
1453
            for key, value in viewitems(node._items):
 
1454
                if new_leaf._map_no_split(key, value):
 
1455
                    return self
 
1456
        trace.mutter("remap generated a new LeafNode")
 
1457
        return new_leaf
 
1458
 
 
1459
 
 
1460
def _deserialise(data, key, search_key_func):
 
1461
    """Helper for repositorydetails - convert bytes to a node."""
 
1462
    if data.startswith(b"chkleaf:\n"):
 
1463
        node = LeafNode.deserialise(data, key, search_key_func=search_key_func)
 
1464
    elif data.startswith(b"chknode:\n"):
 
1465
        node = InternalNode.deserialise(data, key,
 
1466
                                        search_key_func=search_key_func)
 
1467
    else:
 
1468
        raise AssertionError("Unknown node type.")
 
1469
    return node
 
1470
 
 
1471
 
 
1472
class CHKMapDifference(object):
 
1473
    """Iterate the stored pages and key,value pairs for (new - old).
 
1474
 
 
1475
    This class provides a generator over the stored CHK pages and the
 
1476
    (key, value) pairs that are in any of the new maps and not in any of the
 
1477
    old maps.
 
1478
 
 
1479
    Note that it may yield chk pages that are common (especially root nodes),
 
1480
    but it won't yield (key,value) pairs that are common.
 
1481
    """
 
1482
 
 
1483
    def __init__(self, store, new_root_keys, old_root_keys,
 
1484
                 search_key_func, pb=None):
 
1485
        # TODO: Should we add a StaticTuple barrier here? It would be nice to
 
1486
        #       force callers to use StaticTuple, because there will often be
 
1487
        #       lots of keys passed in here. And even if we cast it locally,
 
1488
        #       that just meanst that we will have *both* a StaticTuple and a
 
1489
        #       tuple() in memory, referring to the same object. (so a net
 
1490
        #       increase in memory, not a decrease.)
 
1491
        self._store = store
 
1492
        self._new_root_keys = new_root_keys
 
1493
        self._old_root_keys = old_root_keys
 
1494
        self._pb = pb
 
1495
        # All uninteresting chks that we have seen. By the time they are added
 
1496
        # here, they should be either fully ignored, or queued up for
 
1497
        # processing
 
1498
        # TODO: This might grow to a large size if there are lots of merge
 
1499
        #       parents, etc. However, it probably doesn't scale to O(history)
 
1500
        #       like _processed_new_refs does.
 
1501
        self._all_old_chks = set(self._old_root_keys)
 
1502
        # All items that we have seen from the old_root_keys
 
1503
        self._all_old_items = set()
 
1504
        # These are interesting items which were either read, or already in the
 
1505
        # interesting queue (so we don't need to walk them again)
 
1506
        # TODO: processed_new_refs becomes O(all_chks), consider switching to
 
1507
        #       SimpleSet here.
 
1508
        self._processed_new_refs = set()
 
1509
        self._search_key_func = search_key_func
 
1510
 
 
1511
        # The uninteresting and interesting nodes to be searched
 
1512
        self._old_queue = []
 
1513
        self._new_queue = []
 
1514
        # Holds the (key, value) items found when processing the root nodes,
 
1515
        # waiting for the uninteresting nodes to be walked
 
1516
        self._new_item_queue = []
 
1517
        self._state = None
 
1518
 
 
1519
    def _read_nodes_from_store(self, keys):
 
1520
        # We chose not to use _get_cache(), because we think in
 
1521
        # terms of records to be yielded. Also, we expect to touch each page
 
1522
        # only 1 time during this code. (We may want to evaluate saving the
 
1523
        # raw bytes into the page cache, which would allow a working tree
 
1524
        # update after the fetch to not have to read the bytes again.)
 
1525
        as_st = StaticTuple.from_sequence
 
1526
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1527
        for record in stream:
 
1528
            if self._pb is not None:
 
1529
                self._pb.tick()
 
1530
            if record.storage_kind == 'absent':
 
1531
                raise errors.NoSuchRevision(self._store, record.key)
 
1532
            bytes = record.get_bytes_as('fulltext')
 
1533
            node = _deserialise(bytes, record.key,
 
1534
                                search_key_func=self._search_key_func)
 
1535
            if isinstance(node, InternalNode):
 
1536
                # Note we don't have to do node.refs() because we know that
 
1537
                # there are no children that have been pushed into this node
 
1538
                # Note: Using as_st() here seemed to save 1.2MB, which would
 
1539
                #       indicate that we keep 100k prefix_refs around while
 
1540
                #       processing. They *should* be shorter lived than that...
 
1541
                #       It does cost us ~10s of processing time
 
1542
                prefix_refs = list(viewitems(node._items))
 
1543
                items = []
 
1544
            else:
 
1545
                prefix_refs = []
 
1546
                # Note: We don't use a StaticTuple here. Profiling showed a
 
1547
                #       minor memory improvement (0.8MB out of 335MB peak 0.2%)
 
1548
                #       But a significant slowdown (15s / 145s, or 10%)
 
1549
                items = list(viewitems(node._items))
 
1550
            yield record, node, prefix_refs, items
 
1551
 
 
1552
    def _read_old_roots(self):
 
1553
        old_chks_to_enqueue = []
 
1554
        all_old_chks = self._all_old_chks
 
1555
        for record, node, prefix_refs, items in \
 
1556
                self._read_nodes_from_store(self._old_root_keys):
 
1557
            # Uninteresting node
 
1558
            prefix_refs = [p_r for p_r in prefix_refs
 
1559
                           if p_r[1] not in all_old_chks]
 
1560
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1561
            all_old_chks.update(new_refs)
 
1562
            # TODO: This might be a good time to turn items into StaticTuple
 
1563
            #       instances and possibly intern them. However, this does not
 
1564
            #       impact 'initial branch' performance, so I'm not worrying
 
1565
            #       about this yet
 
1566
            self._all_old_items.update(items)
 
1567
            # Queue up the uninteresting references
 
1568
            # Don't actually put them in the 'to-read' queue until we have
 
1569
            # finished checking the interesting references
 
1570
            old_chks_to_enqueue.extend(prefix_refs)
 
1571
        return old_chks_to_enqueue
 
1572
 
 
1573
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1574
        # At this point, we have read all the uninteresting and interesting
 
1575
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1576
        # handled the interesting ones
 
1577
        for prefix, ref in old_chks_to_enqueue:
 
1578
            not_interesting = True
 
1579
            for i in range(len(prefix), 0, -1):
 
1580
                if prefix[:i] in new_prefixes:
 
1581
                    not_interesting = False
 
1582
                    break
 
1583
            if not_interesting:
 
1584
                # This prefix is not part of the remaining 'interesting set'
 
1585
                continue
 
1586
            self._old_queue.append(ref)
 
1587
 
 
1588
    def _read_all_roots(self):
 
1589
        """Read the root pages.
 
1590
 
 
1591
        This is structured as a generator, so that the root records can be
 
1592
        yielded up to whoever needs them without any buffering.
 
1593
        """
 
1594
        # This is the bootstrap phase
 
1595
        if not self._old_root_keys:
 
1596
            # With no old_root_keys we can just shortcut and be ready
 
1597
            # for _flush_new_queue
 
1598
            self._new_queue = list(self._new_root_keys)
 
1599
            return
 
1600
        old_chks_to_enqueue = self._read_old_roots()
 
1601
        # filter out any root keys that are already known to be uninteresting
 
1602
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1603
        # These are prefixes that are present in new_keys that we are
 
1604
        # thinking to yield
 
1605
        new_prefixes = set()
 
1606
        # We are about to yield all of these, so we don't want them getting
 
1607
        # added a second time
 
1608
        processed_new_refs = self._processed_new_refs
 
1609
        processed_new_refs.update(new_keys)
 
1610
        for record, node, prefix_refs, items in \
 
1611
                self._read_nodes_from_store(new_keys):
 
1612
            # At this level, we now know all the uninteresting references
 
1613
            # So we filter and queue up whatever is remaining
 
1614
            prefix_refs = [p_r for p_r in prefix_refs
 
1615
                           if p_r[1] not in self._all_old_chks and
 
1616
                           p_r[1] not in processed_new_refs]
 
1617
            refs = [p_r[1] for p_r in prefix_refs]
 
1618
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1619
            self._new_queue.extend(refs)
 
1620
            # TODO: We can potentially get multiple items here, however the
 
1621
            #       current design allows for this, as callers will do the work
 
1622
            #       to make the results unique. We might profile whether we
 
1623
            #       gain anything by ensuring unique return values for items
 
1624
            # TODO: This might be a good time to cast to StaticTuple, as
 
1625
            #       self._new_item_queue will hold the contents of multiple
 
1626
            #       records for an extended lifetime
 
1627
            new_items = [item for item in items
 
1628
                         if item not in self._all_old_items]
 
1629
            self._new_item_queue.extend(new_items)
 
1630
            new_prefixes.update([self._search_key_func(item[0])
 
1631
                                 for item in new_items])
 
1632
            processed_new_refs.update(refs)
 
1633
            yield record
 
1634
        # For new_prefixes we have the full length prefixes queued up.
 
1635
        # However, we also need possible prefixes. (If we have a known ref to
 
1636
        # 'ab', then we also need to include 'a'.) So expand the
 
1637
        # new_prefixes to include all shorter prefixes
 
1638
        for prefix in list(new_prefixes):
 
1639
            new_prefixes.update([prefix[:i] for i in range(1, len(prefix))])
 
1640
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1641
 
 
1642
    def _flush_new_queue(self):
 
1643
        # No need to maintain the heap invariant anymore, just pull things out
 
1644
        # and process them
 
1645
        refs = set(self._new_queue)
 
1646
        self._new_queue = []
 
1647
        # First pass, flush all interesting items and convert to using direct refs
 
1648
        all_old_chks = self._all_old_chks
 
1649
        processed_new_refs = self._processed_new_refs
 
1650
        all_old_items = self._all_old_items
 
1651
        new_items = [item for item in self._new_item_queue
 
1652
                     if item not in all_old_items]
 
1653
        self._new_item_queue = []
 
1654
        if new_items:
 
1655
            yield None, new_items
 
1656
        refs = refs.difference(all_old_chks)
 
1657
        processed_new_refs.update(refs)
 
1658
        while refs:
 
1659
            # TODO: Using a SimpleSet for self._processed_new_refs and
 
1660
            #       saved as much as 10MB of peak memory. However, it requires
 
1661
            #       implementing a non-pyrex version.
 
1662
            next_refs = set()
 
1663
            next_refs_update = next_refs.update
 
1664
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1665
            # from 1m54s to 1m51s. Consider it.
 
1666
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1667
                if all_old_items:
 
1668
                    # using the 'if' check saves about 145s => 141s, when
 
1669
                    # streaming initial branch of Launchpad data.
 
1670
                    items = [item for item in items
 
1671
                             if item not in all_old_items]
 
1672
                yield record, items
 
1673
                next_refs_update([p_r[1] for p_r in p_refs])
 
1674
                del p_refs
 
1675
            # set1.difference(set/dict) walks all of set1, and checks if it
 
1676
            # exists in 'other'.
 
1677
            # set1.difference(iterable) walks all of iterable, and does a
 
1678
            # 'difference_update' on a clone of set1. Pick wisely based on the
 
1679
            # expected sizes of objects.
 
1680
            # in our case it is expected that 'new_refs' will always be quite
 
1681
            # small.
 
1682
            next_refs = next_refs.difference(all_old_chks)
 
1683
            next_refs = next_refs.difference(processed_new_refs)
 
1684
            processed_new_refs.update(next_refs)
 
1685
            refs = next_refs
 
1686
 
 
1687
    def _process_next_old(self):
 
1688
        # Since we don't filter uninteresting any further than during
 
1689
        # _read_all_roots, process the whole queue in a single pass.
 
1690
        refs = self._old_queue
 
1691
        self._old_queue = []
 
1692
        all_old_chks = self._all_old_chks
 
1693
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1694
            # TODO: Use StaticTuple here?
 
1695
            self._all_old_items.update(items)
 
1696
            refs = [r for _, r in prefix_refs if r not in all_old_chks]
 
1697
            self._old_queue.extend(refs)
 
1698
            all_old_chks.update(refs)
 
1699
 
 
1700
    def _process_queues(self):
 
1701
        while self._old_queue:
 
1702
            self._process_next_old()
 
1703
        return self._flush_new_queue()
 
1704
 
 
1705
    def process(self):
 
1706
        for record in self._read_all_roots():
 
1707
            yield record, []
 
1708
        for record, items in self._process_queues():
 
1709
            yield record, items
 
1710
 
 
1711
 
 
1712
def iter_interesting_nodes(store, interesting_root_keys,
 
1713
                           uninteresting_root_keys, pb=None):
 
1714
    """Given root keys, find interesting nodes.
 
1715
 
 
1716
    Evaluate nodes referenced by interesting_root_keys. Ones that are also
 
1717
    referenced from uninteresting_root_keys are not considered interesting.
 
1718
 
 
1719
    :param interesting_root_keys: keys which should be part of the
 
1720
        "interesting" nodes (which will be yielded)
 
1721
    :param uninteresting_root_keys: keys which should be filtered out of the
 
1722
        result set.
 
1723
    :return: Yield
 
1724
        (interesting record, {interesting key:values})
 
1725
    """
 
1726
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1727
                                uninteresting_root_keys,
 
1728
                                search_key_func=store._search_key_func,
 
1729
                                pb=pb)
 
1730
    return iterator.process()
 
1731
 
 
1732
 
 
1733
try:
 
1734
    from ._chk_map_pyx import (
 
1735
        _bytes_to_text_key,
 
1736
        _search_key_16,
 
1737
        _search_key_255,
 
1738
        _deserialise_leaf_node,
 
1739
        _deserialise_internal_node,
 
1740
        )
 
1741
except ImportError as e:
 
1742
    osutils.failed_to_load_extension(e)
 
1743
    from ._chk_map_py import (
 
1744
        _bytes_to_text_key,
 
1745
        _search_key_16,
 
1746
        _search_key_255,
 
1747
        _deserialise_leaf_node,
 
1748
        _deserialise_internal_node,
 
1749
        )  # noqa: F401
 
1750
search_key_registry.register(b'hash-16-way', _search_key_16)
 
1751
search_key_registry.register(b'hash-255-way', _search_key_255)
 
1752
 
 
1753
 
 
1754
def _check_key(key):
 
1755
    """Helper function to assert that a key is properly formatted.
 
1756
 
 
1757
    This generally shouldn't be used in production code, but it can be helpful
 
1758
    to debug problems.
 
1759
    """
 
1760
    if not isinstance(key, StaticTuple):
 
1761
        raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
 
1762
    if len(key) != 1:
 
1763
        raise ValueError('key %r should have length 1, not %d' %
 
1764
                         (key, len(key),))
 
1765
    if not isinstance(key[0], str):
 
1766
        raise TypeError('key %r should hold a str, not %r'
 
1767
                        % (key, type(key[0])))
 
1768
    if not key[0].startswith('sha1:'):
 
1769
        raise ValueError('key %r should point to a sha1:' % (key,))