/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-19 10:58:39 UTC
  • mfrom: (6383 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6386.
  • Revision ID: jelmer@canonical.com-20111219105839-uji05ck4rkm1mj4j
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from __future__ import absolute_import
 
18
 
 
19
"""Persistent maps from tuple_of_strings->string using CHK stores.
 
20
 
 
21
Overview and current status:
 
22
 
 
23
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
 
24
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
 
25
joining them by \x00, and \x00 padding shorter keys out to the length of the
 
26
longest key. Leaf nodes are packed as densely as possible, and internal nodes
 
27
are all an additional 8-bits wide leading to a sparse upper tree.
 
28
 
 
29
Updates to a CHKMap are done preferentially via the apply_delta method, to
 
30
allow optimisation of the update operation; but individual map/unmap calls are
 
31
possible and supported. Individual changes via map/unmap are buffered in memory
 
32
until the _save method is called to force serialisation of the tree.
 
33
apply_delta records its changes immediately by performing an implicit _save.
 
34
 
 
35
TODO:
 
36
-----
 
37
 
 
38
Densely packed upper nodes.
 
39
 
 
40
"""
 
41
 
 
42
from __future__ import absolute_import
 
43
 
 
44
import heapq
 
45
import threading
 
46
 
 
47
from bzrlib import lazy_import
 
48
lazy_import.lazy_import(globals(), """
 
49
from bzrlib import (
 
50
    errors,
 
51
    )
 
52
""")
 
53
from bzrlib import (
 
54
    errors,
 
55
    lru_cache,
 
56
    osutils,
 
57
    registry,
 
58
    static_tuple,
 
59
    trace,
 
60
    )
 
61
from bzrlib.static_tuple import StaticTuple
 
62
 
 
63
# approx 4MB
 
64
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
 
65
# out, it takes 3.1MB to cache the layer.
 
66
_PAGE_CACHE_SIZE = 4*1024*1024
 
67
# Per thread caches for 2 reasons:
 
68
# - in the server we may be serving very different content, so we get less
 
69
#   cache thrashing.
 
70
# - we avoid locking on every cache lookup.
 
71
_thread_caches = threading.local()
 
72
# The page cache.
 
73
_thread_caches.page_cache = None
 
74
 
 
75
def _get_cache():
 
76
    """Get the per-thread page cache.
 
77
 
 
78
    We need a function to do this because in a new thread the _thread_caches
 
79
    threading.local object does not have the cache initialized yet.
 
80
    """
 
81
    page_cache = getattr(_thread_caches, 'page_cache', None)
 
82
    if page_cache is None:
 
83
        # We are caching bytes so len(value) is perfectly accurate
 
84
        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
 
85
        _thread_caches.page_cache = page_cache
 
86
    return page_cache
 
87
 
 
88
 
 
89
def clear_cache():
 
90
    _get_cache().clear()
 
91
 
 
92
 
 
93
# If a ChildNode falls below this many bytes, we check for a remap
 
94
_INTERESTING_NEW_SIZE = 50
 
95
# If a ChildNode shrinks by more than this amount, we check for a remap
 
96
_INTERESTING_SHRINKAGE_LIMIT = 20
 
97
 
 
98
 
 
99
def _search_key_plain(key):
 
100
    """Map the key tuple into a search string that just uses the key bytes."""
 
101
    return '\x00'.join(key)
 
102
 
 
103
 
 
104
search_key_registry = registry.Registry()
 
105
search_key_registry.register('plain', _search_key_plain)
 
106
 
 
107
 
 
108
class CHKMap(object):
 
109
    """A persistent map from string to string backed by a CHK store."""
 
110
 
 
111
    __slots__ = ('_store', '_root_node', '_search_key_func')
 
112
 
 
113
    def __init__(self, store, root_key, search_key_func=None):
 
114
        """Create a CHKMap object.
 
115
 
 
116
        :param store: The store the CHKMap is stored in.
 
117
        :param root_key: The root key of the map. None to create an empty
 
118
            CHKMap.
 
119
        :param search_key_func: A function mapping a key => bytes. These bytes
 
120
            are then used by the internal nodes to split up leaf nodes into
 
121
            multiple pages.
 
122
        """
 
123
        self._store = store
 
124
        if search_key_func is None:
 
125
            search_key_func = _search_key_plain
 
126
        self._search_key_func = search_key_func
 
127
        if root_key is None:
 
128
            self._root_node = LeafNode(search_key_func=search_key_func)
 
129
        else:
 
130
            self._root_node = self._node_key(root_key)
 
131
 
 
132
    def apply_delta(self, delta):
 
133
        """Apply a delta to the map.
 
134
 
 
135
        :param delta: An iterable of old_key, new_key, new_value tuples.
 
136
            If new_key is not None, then new_key->new_value is inserted
 
137
            into the map; if old_key is not None, then the old mapping
 
138
            of old_key is removed.
 
139
        """
 
140
        has_deletes = False
 
141
        # Check preconditions first.
 
142
        as_st = StaticTuple.from_sequence
 
143
        new_items = set([as_st(key) for (old, key, value) in delta
 
144
                         if key is not None and old is None])
 
145
        existing_new = list(self.iteritems(key_filter=new_items))
 
146
        if existing_new:
 
147
            raise errors.InconsistentDeltaDelta(delta,
 
148
                "New items are already in the map %r." % existing_new)
 
149
        # Now apply changes.
 
150
        for old, new, value in delta:
 
151
            if old is not None and old != new:
 
152
                self.unmap(old, check_remap=False)
 
153
                has_deletes = True
 
154
        for old, new, value in delta:
 
155
            if new is not None:
 
156
                self.map(new, value)
 
157
        if has_deletes:
 
158
            self._check_remap()
 
159
        return self._save()
 
160
 
 
161
    def _ensure_root(self):
 
162
        """Ensure that the root node is an object not a key."""
 
163
        if type(self._root_node) is StaticTuple:
 
164
            # Demand-load the root
 
165
            self._root_node = self._get_node(self._root_node)
 
166
 
 
167
    def _get_node(self, node):
 
168
        """Get a node.
 
169
 
 
170
        Note that this does not update the _items dict in objects containing a
 
171
        reference to this node. As such it does not prevent subsequent IO being
 
172
        performed.
 
173
 
 
174
        :param node: A tuple key or node object.
 
175
        :return: A node object.
 
176
        """
 
177
        if type(node) is StaticTuple:
 
178
            bytes = self._read_bytes(node)
 
179
            return _deserialise(bytes, node,
 
180
                search_key_func=self._search_key_func)
 
181
        else:
 
182
            return node
 
183
 
 
184
    def _read_bytes(self, key):
 
185
        try:
 
186
            return _get_cache()[key]
 
187
        except KeyError:
 
188
            stream = self._store.get_record_stream([key], 'unordered', True)
 
189
            bytes = stream.next().get_bytes_as('fulltext')
 
190
            _get_cache()[key] = bytes
 
191
            return bytes
 
192
 
 
193
    def _dump_tree(self, include_keys=False):
 
194
        """Return the tree in a string representation."""
 
195
        self._ensure_root()
 
196
        res = self._dump_tree_node(self._root_node, prefix='', indent='',
 
197
                                   include_keys=include_keys)
 
198
        res.append('') # Give a trailing '\n'
 
199
        return '\n'.join(res)
 
200
 
 
201
    def _dump_tree_node(self, node, prefix, indent, include_keys=True):
 
202
        """For this node and all children, generate a string representation."""
 
203
        result = []
 
204
        if not include_keys:
 
205
            key_str = ''
 
206
        else:
 
207
            node_key = node.key()
 
208
            if node_key is not None:
 
209
                key_str = ' %s' % (node_key[0],)
 
210
            else:
 
211
                key_str = ' None'
 
212
        result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
 
213
                                     key_str))
 
214
        if type(node) is InternalNode:
 
215
            # Trigger all child nodes to get loaded
 
216
            list(node._iter_nodes(self._store))
 
217
            for prefix, sub in sorted(node._items.iteritems()):
 
218
                result.extend(self._dump_tree_node(sub, prefix, indent + '  ',
 
219
                                                   include_keys=include_keys))
 
220
        else:
 
221
            for key, value in sorted(node._items.iteritems()):
 
222
                # Don't use prefix nor indent here to line up when used in
 
223
                # tests in conjunction with assertEqualDiff
 
224
                result.append('      %r %r' % (tuple(key), value))
 
225
        return result
 
226
 
 
227
    @classmethod
 
228
    def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
 
229
        search_key_func=None):
 
230
        """Create a CHKMap in store with initial_value as the content.
 
231
 
 
232
        :param store: The store to record initial_value in, a VersionedFiles
 
233
            object with 1-tuple keys supporting CHK key generation.
 
234
        :param initial_value: A dict to store in store. Its keys and values
 
235
            must be bytestrings.
 
236
        :param maximum_size: The maximum_size rule to apply to nodes. This
 
237
            determines the size at which no new data is added to a single node.
 
238
        :param key_width: The number of elements in each key_tuple being stored
 
239
            in this map.
 
240
        :param search_key_func: A function mapping a key => bytes. These bytes
 
241
            are then used by the internal nodes to split up leaf nodes into
 
242
            multiple pages.
 
243
        :return: The root chk of the resulting CHKMap.
 
244
        """
 
245
        root_key = klass._create_directly(store, initial_value,
 
246
            maximum_size=maximum_size, key_width=key_width,
 
247
            search_key_func=search_key_func)
 
248
        if type(root_key) is not StaticTuple:
 
249
            raise AssertionError('we got a %s instead of a StaticTuple'
 
250
                                 % (type(root_key),))
 
251
        return root_key
 
252
 
 
253
    @classmethod
 
254
    def _create_via_map(klass, store, initial_value, maximum_size=0,
 
255
                        key_width=1, search_key_func=None):
 
256
        result = klass(store, None, search_key_func=search_key_func)
 
257
        result._root_node.set_maximum_size(maximum_size)
 
258
        result._root_node._key_width = key_width
 
259
        delta = []
 
260
        for key, value in initial_value.items():
 
261
            delta.append((None, key, value))
 
262
        root_key = result.apply_delta(delta)
 
263
        return root_key
 
264
 
 
265
    @classmethod
 
266
    def _create_directly(klass, store, initial_value, maximum_size=0,
 
267
                         key_width=1, search_key_func=None):
 
268
        node = LeafNode(search_key_func=search_key_func)
 
269
        node.set_maximum_size(maximum_size)
 
270
        node._key_width = key_width
 
271
        as_st = StaticTuple.from_sequence
 
272
        node._items = dict([(as_st(key), val) for key, val
 
273
                                               in initial_value.iteritems()])
 
274
        node._raw_size = sum([node._key_value_len(key, value)
 
275
                              for key,value in node._items.iteritems()])
 
276
        node._len = len(node._items)
 
277
        node._compute_search_prefix()
 
278
        node._compute_serialised_prefix()
 
279
        if (node._len > 1
 
280
            and maximum_size
 
281
            and node._current_size() > maximum_size):
 
282
            prefix, node_details = node._split(store)
 
283
            if len(node_details) == 1:
 
284
                raise AssertionError('Failed to split using node._split')
 
285
            node = InternalNode(prefix, search_key_func=search_key_func)
 
286
            node.set_maximum_size(maximum_size)
 
287
            node._key_width = key_width
 
288
            for split, subnode in node_details:
 
289
                node.add_node(split, subnode)
 
290
        keys = list(node.serialise(store))
 
291
        return keys[-1]
 
292
 
 
293
    def iter_changes(self, basis):
 
294
        """Iterate over the changes between basis and self.
 
295
 
 
296
        :return: An iterator of tuples: (key, old_value, new_value). Old_value
 
297
            is None for keys only in self; new_value is None for keys only in
 
298
            basis.
 
299
        """
 
300
        # Overview:
 
301
        # Read both trees in lexographic, highest-first order.
 
302
        # Any identical nodes we skip
 
303
        # Any unique prefixes we output immediately.
 
304
        # values in a leaf node are treated as single-value nodes in the tree
 
305
        # which allows them to be not-special-cased. We know to output them
 
306
        # because their value is a string, not a key(tuple) or node.
 
307
        #
 
308
        # corner cases to beware of when considering this function:
 
309
        # *) common references are at different heights.
 
310
        #    consider two trees:
 
311
        #    {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
 
312
        #    {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
 
313
        #                        'ab':LeafNode={'ab':'bar'}}
 
314
        #     'b': LeafNode={'b'}}
 
315
        #    the node with aaa/aab will only be encountered in the second tree
 
316
        #    after reading the 'a' subtree, but it is encountered in the first
 
317
        #    tree immediately. Variations on this may have read internal nodes
 
318
        #    like this.  we want to cut the entire pending subtree when we
 
319
        #    realise we have a common node.  For this we use a list of keys -
 
320
        #    the path to a node - and check the entire path is clean as we
 
321
        #    process each item.
 
322
        if self._node_key(self._root_node) == self._node_key(basis._root_node):
 
323
            return
 
324
        self._ensure_root()
 
325
        basis._ensure_root()
 
326
        excluded_keys = set()
 
327
        self_node = self._root_node
 
328
        basis_node = basis._root_node
 
329
        # A heap, each element is prefix, node(tuple/NodeObject/string),
 
330
        # key_path (a list of tuples, tail-sharing down the tree.)
 
331
        self_pending = []
 
332
        basis_pending = []
 
333
        def process_node(node, path, a_map, pending):
 
334
            # take a node and expand it
 
335
            node = a_map._get_node(node)
 
336
            if type(node) == LeafNode:
 
337
                path = (node._key, path)
 
338
                for key, value in node._items.items():
 
339
                    # For a LeafNode, the key is a serialized_key, rather than
 
340
                    # a search_key, but the heap is using search_keys
 
341
                    search_key = node._search_key_func(key)
 
342
                    heapq.heappush(pending, (search_key, key, value, path))
 
343
            else:
 
344
                # type(node) == InternalNode
 
345
                path = (node._key, path)
 
346
                for prefix, child in node._items.items():
 
347
                    heapq.heappush(pending, (prefix, None, child, path))
 
348
        def process_common_internal_nodes(self_node, basis_node):
 
349
            self_items = set(self_node._items.items())
 
350
            basis_items = set(basis_node._items.items())
 
351
            path = (self_node._key, None)
 
352
            for prefix, child in self_items - basis_items:
 
353
                heapq.heappush(self_pending, (prefix, None, child, path))
 
354
            path = (basis_node._key, None)
 
355
            for prefix, child in basis_items - self_items:
 
356
                heapq.heappush(basis_pending, (prefix, None, child, path))
 
357
        def process_common_leaf_nodes(self_node, basis_node):
 
358
            self_items = set(self_node._items.items())
 
359
            basis_items = set(basis_node._items.items())
 
360
            path = (self_node._key, None)
 
361
            for key, value in self_items - basis_items:
 
362
                prefix = self._search_key_func(key)
 
363
                heapq.heappush(self_pending, (prefix, key, value, path))
 
364
            path = (basis_node._key, None)
 
365
            for key, value in basis_items - self_items:
 
366
                prefix = basis._search_key_func(key)
 
367
                heapq.heappush(basis_pending, (prefix, key, value, path))
 
368
        def process_common_prefix_nodes(self_node, self_path,
 
369
                                        basis_node, basis_path):
 
370
            # Would it be more efficient if we could request both at the same
 
371
            # time?
 
372
            self_node = self._get_node(self_node)
 
373
            basis_node = basis._get_node(basis_node)
 
374
            if (type(self_node) == InternalNode
 
375
                and type(basis_node) == InternalNode):
 
376
                # Matching internal nodes
 
377
                process_common_internal_nodes(self_node, basis_node)
 
378
            elif (type(self_node) == LeafNode
 
379
                  and type(basis_node) == LeafNode):
 
380
                process_common_leaf_nodes(self_node, basis_node)
 
381
            else:
 
382
                process_node(self_node, self_path, self, self_pending)
 
383
                process_node(basis_node, basis_path, basis, basis_pending)
 
384
        process_common_prefix_nodes(self_node, None, basis_node, None)
 
385
        self_seen = set()
 
386
        basis_seen = set()
 
387
        excluded_keys = set()
 
388
        def check_excluded(key_path):
 
389
            # Note that this is N^2, it depends on us trimming trees
 
390
            # aggressively to not become slow.
 
391
            # A better implementation would probably have a reverse map
 
392
            # back to the children of a node, and jump straight to it when
 
393
            # a common node is detected, the proceed to remove the already
 
394
            # pending children. bzrlib.graph has a searcher module with a
 
395
            # similar problem.
 
396
            while key_path is not None:
 
397
                key, key_path = key_path
 
398
                if key in excluded_keys:
 
399
                    return True
 
400
            return False
 
401
 
 
402
        loop_counter = 0
 
403
        while self_pending or basis_pending:
 
404
            loop_counter += 1
 
405
            if not self_pending:
 
406
                # self is exhausted: output remainder of basis
 
407
                for prefix, key, node, path in basis_pending:
 
408
                    if check_excluded(path):
 
409
                        continue
 
410
                    node = basis._get_node(node)
 
411
                    if key is not None:
 
412
                        # a value
 
413
                        yield (key, node, None)
 
414
                    else:
 
415
                        # subtree - fastpath the entire thing.
 
416
                        for key, value in node.iteritems(basis._store):
 
417
                            yield (key, value, None)
 
418
                return
 
419
            elif not basis_pending:
 
420
                # basis is exhausted: output remainder of self.
 
421
                for prefix, key, node, path in self_pending:
 
422
                    if check_excluded(path):
 
423
                        continue
 
424
                    node = self._get_node(node)
 
425
                    if key is not None:
 
426
                        # a value
 
427
                        yield (key, None, node)
 
428
                    else:
 
429
                        # subtree - fastpath the entire thing.
 
430
                        for key, value in node.iteritems(self._store):
 
431
                            yield (key, None, value)
 
432
                return
 
433
            else:
 
434
                # XXX: future optimisation - yield the smaller items
 
435
                # immediately rather than pushing everything on/off the
 
436
                # heaps. Applies to both internal nodes and leafnodes.
 
437
                if self_pending[0][0] < basis_pending[0][0]:
 
438
                    # expand self
 
439
                    prefix, key, node, path = heapq.heappop(self_pending)
 
440
                    if check_excluded(path):
 
441
                        continue
 
442
                    if key is not None:
 
443
                        # a value
 
444
                        yield (key, None, node)
 
445
                    else:
 
446
                        process_node(node, path, self, self_pending)
 
447
                        continue
 
448
                elif self_pending[0][0] > basis_pending[0][0]:
 
449
                    # expand basis
 
450
                    prefix, key, node, path = heapq.heappop(basis_pending)
 
451
                    if check_excluded(path):
 
452
                        continue
 
453
                    if key is not None:
 
454
                        # a value
 
455
                        yield (key, node, None)
 
456
                    else:
 
457
                        process_node(node, path, basis, basis_pending)
 
458
                        continue
 
459
                else:
 
460
                    # common prefix: possibly expand both
 
461
                    if self_pending[0][1] is None:
 
462
                        # process next self
 
463
                        read_self = True
 
464
                    else:
 
465
                        read_self = False
 
466
                    if basis_pending[0][1] is None:
 
467
                        # process next basis
 
468
                        read_basis = True
 
469
                    else:
 
470
                        read_basis = False
 
471
                    if not read_self and not read_basis:
 
472
                        # compare a common value
 
473
                        self_details = heapq.heappop(self_pending)
 
474
                        basis_details = heapq.heappop(basis_pending)
 
475
                        if self_details[2] != basis_details[2]:
 
476
                            yield (self_details[1],
 
477
                                basis_details[2], self_details[2])
 
478
                        continue
 
479
                    # At least one side wasn't a simple value
 
480
                    if (self._node_key(self_pending[0][2]) ==
 
481
                        self._node_key(basis_pending[0][2])):
 
482
                        # Identical pointers, skip (and don't bother adding to
 
483
                        # excluded, it won't turn up again.
 
484
                        heapq.heappop(self_pending)
 
485
                        heapq.heappop(basis_pending)
 
486
                        continue
 
487
                    # Now we need to expand this node before we can continue
 
488
                    if read_self and read_basis:
 
489
                        # Both sides start with the same prefix, so process
 
490
                        # them in parallel
 
491
                        self_prefix, _, self_node, self_path = heapq.heappop(
 
492
                            self_pending)
 
493
                        basis_prefix, _, basis_node, basis_path = heapq.heappop(
 
494
                            basis_pending)
 
495
                        if self_prefix != basis_prefix:
 
496
                            raise AssertionError(
 
497
                                '%r != %r' % (self_prefix, basis_prefix))
 
498
                        process_common_prefix_nodes(
 
499
                            self_node, self_path,
 
500
                            basis_node, basis_path)
 
501
                        continue
 
502
                    if read_self:
 
503
                        prefix, key, node, path = heapq.heappop(self_pending)
 
504
                        if check_excluded(path):
 
505
                            continue
 
506
                        process_node(node, path, self, self_pending)
 
507
                    if read_basis:
 
508
                        prefix, key, node, path = heapq.heappop(basis_pending)
 
509
                        if check_excluded(path):
 
510
                            continue
 
511
                        process_node(node, path, basis, basis_pending)
 
512
        # print loop_counter
 
513
 
 
514
    def iteritems(self, key_filter=None):
 
515
        """Iterate over the entire CHKMap's contents."""
 
516
        self._ensure_root()
 
517
        if key_filter is not None:
 
518
            as_st = StaticTuple.from_sequence
 
519
            key_filter = [as_st(key) for key in key_filter]
 
520
        return self._root_node.iteritems(self._store, key_filter=key_filter)
 
521
 
 
522
    def key(self):
 
523
        """Return the key for this map."""
 
524
        if type(self._root_node) is StaticTuple:
 
525
            return self._root_node
 
526
        else:
 
527
            return self._root_node._key
 
528
 
 
529
    def __len__(self):
 
530
        self._ensure_root()
 
531
        return len(self._root_node)
 
532
 
 
533
    def map(self, key, value):
 
534
        """Map a key tuple to value.
 
535
        
 
536
        :param key: A key to map.
 
537
        :param value: The value to assign to key.
 
538
        """
 
539
        key = StaticTuple.from_sequence(key)
 
540
        # Need a root object.
 
541
        self._ensure_root()
 
542
        prefix, node_details = self._root_node.map(self._store, key, value)
 
543
        if len(node_details) == 1:
 
544
            self._root_node = node_details[0][1]
 
545
        else:
 
546
            self._root_node = InternalNode(prefix,
 
547
                                search_key_func=self._search_key_func)
 
548
            self._root_node.set_maximum_size(node_details[0][1].maximum_size)
 
549
            self._root_node._key_width = node_details[0][1]._key_width
 
550
            for split, node in node_details:
 
551
                self._root_node.add_node(split, node)
 
552
 
 
553
    def _node_key(self, node):
 
554
        """Get the key for a node whether it's a tuple or node."""
 
555
        if type(node) is tuple:
 
556
            node = StaticTuple.from_sequence(node)
 
557
        if type(node) is StaticTuple:
 
558
            return node
 
559
        else:
 
560
            return node._key
 
561
 
 
562
    def unmap(self, key, check_remap=True):
 
563
        """remove key from the map."""
 
564
        key = StaticTuple.from_sequence(key)
 
565
        self._ensure_root()
 
566
        if type(self._root_node) is InternalNode:
 
567
            unmapped = self._root_node.unmap(self._store, key,
 
568
                check_remap=check_remap)
 
569
        else:
 
570
            unmapped = self._root_node.unmap(self._store, key)
 
571
        self._root_node = unmapped
 
572
 
 
573
    def _check_remap(self):
 
574
        """Check if nodes can be collapsed."""
 
575
        self._ensure_root()
 
576
        if type(self._root_node) is InternalNode:
 
577
            self._root_node = self._root_node._check_remap(self._store)
 
578
 
 
579
    def _save(self):
 
580
        """Save the map completely.
 
581
 
 
582
        :return: The key of the root node.
 
583
        """
 
584
        if type(self._root_node) is StaticTuple:
 
585
            # Already saved.
 
586
            return self._root_node
 
587
        keys = list(self._root_node.serialise(self._store))
 
588
        return keys[-1]
 
589
 
 
590
 
 
591
class Node(object):
 
592
    """Base class defining the protocol for CHK Map nodes.
 
593
 
 
594
    :ivar _raw_size: The total size of the serialized key:value data, before
 
595
        adding the header bytes, and without prefix compression.
 
596
    """
 
597
 
 
598
    __slots__ = ('_key', '_len', '_maximum_size', '_key_width',
 
599
                 '_raw_size', '_items', '_search_prefix', '_search_key_func'
 
600
                )
 
601
 
 
602
    def __init__(self, key_width=1):
 
603
        """Create a node.
 
604
 
 
605
        :param key_width: The width of keys for this node.
 
606
        """
 
607
        self._key = None
 
608
        # Current number of elements
 
609
        self._len = 0
 
610
        self._maximum_size = 0
 
611
        self._key_width = key_width
 
612
        # current size in bytes
 
613
        self._raw_size = 0
 
614
        # The pointers/values this node has - meaning defined by child classes.
 
615
        self._items = {}
 
616
        # The common search prefix
 
617
        self._search_prefix = None
 
618
 
 
619
    def __repr__(self):
 
620
        items_str = str(sorted(self._items))
 
621
        if len(items_str) > 20:
 
622
            items_str = items_str[:16] + '...]'
 
623
        return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
 
624
            self.__class__.__name__, self._key, self._len, self._raw_size,
 
625
            self._maximum_size, self._search_prefix, items_str)
 
626
 
 
627
    def key(self):
 
628
        return self._key
 
629
 
 
630
    def __len__(self):
 
631
        return self._len
 
632
 
 
633
    @property
 
634
    def maximum_size(self):
 
635
        """What is the upper limit for adding references to a node."""
 
636
        return self._maximum_size
 
637
 
 
638
    def set_maximum_size(self, new_size):
 
639
        """Set the size threshold for nodes.
 
640
 
 
641
        :param new_size: The size at which no data is added to a node. 0 for
 
642
            unlimited.
 
643
        """
 
644
        self._maximum_size = new_size
 
645
 
 
646
    @classmethod
 
647
    def common_prefix(cls, prefix, key):
 
648
        """Given 2 strings, return the longest prefix common to both.
 
649
 
 
650
        :param prefix: This has been the common prefix for other keys, so it is
 
651
            more likely to be the common prefix in this case as well.
 
652
        :param key: Another string to compare to
 
653
        """
 
654
        if key.startswith(prefix):
 
655
            return prefix
 
656
        pos = -1
 
657
        # Is there a better way to do this?
 
658
        for pos, (left, right) in enumerate(zip(prefix, key)):
 
659
            if left != right:
 
660
                pos -= 1
 
661
                break
 
662
        common = prefix[:pos+1]
 
663
        return common
 
664
 
 
665
    @classmethod
 
666
    def common_prefix_for_keys(cls, keys):
 
667
        """Given a list of keys, find their common prefix.
 
668
 
 
669
        :param keys: An iterable of strings.
 
670
        :return: The longest common prefix of all keys.
 
671
        """
 
672
        common_prefix = None
 
673
        for key in keys:
 
674
            if common_prefix is None:
 
675
                common_prefix = key
 
676
                continue
 
677
            common_prefix = cls.common_prefix(common_prefix, key)
 
678
            if not common_prefix:
 
679
                # if common_prefix is the empty string, then we know it won't
 
680
                # change further
 
681
                return ''
 
682
        return common_prefix
 
683
 
 
684
 
 
685
# Singleton indicating we have not computed _search_prefix yet
 
686
_unknown = object()
 
687
 
 
688
class LeafNode(Node):
 
689
    """A node containing actual key:value pairs.
 
690
 
 
691
    :ivar _items: A dict of key->value items. The key is in tuple form.
 
692
    :ivar _size: The number of bytes that would be used by serializing all of
 
693
        the key/value pairs.
 
694
    """
 
695
 
 
696
    __slots__ = ('_common_serialised_prefix',)
 
697
 
 
698
    def __init__(self, search_key_func=None):
 
699
        Node.__init__(self)
 
700
        # All of the keys in this leaf node share this common prefix
 
701
        self._common_serialised_prefix = None
 
702
        if search_key_func is None:
 
703
            self._search_key_func = _search_key_plain
 
704
        else:
 
705
            self._search_key_func = search_key_func
 
706
 
 
707
    def __repr__(self):
 
708
        items_str = str(sorted(self._items))
 
709
        if len(items_str) > 20:
 
710
            items_str = items_str[:16] + '...]'
 
711
        return \
 
712
            '%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
 
713
            % (self.__class__.__name__, self._key, self._len, self._raw_size,
 
714
            self._maximum_size, self._search_prefix, self._key_width, items_str)
 
715
 
 
716
    def _current_size(self):
 
717
        """Answer the current serialised size of this node.
 
718
 
 
719
        This differs from self._raw_size in that it includes the bytes used for
 
720
        the header.
 
721
        """
 
722
        if self._common_serialised_prefix is None:
 
723
            bytes_for_items = 0
 
724
            prefix_len = 0
 
725
        else:
 
726
            # We will store a single string with the common prefix
 
727
            # And then that common prefix will not be stored in any of the
 
728
            # entry lines
 
729
            prefix_len = len(self._common_serialised_prefix)
 
730
            bytes_for_items = (self._raw_size - (prefix_len * self._len))
 
731
        return (9 # 'chkleaf:\n'
 
732
            + len(str(self._maximum_size)) + 1
 
733
            + len(str(self._key_width)) + 1
 
734
            + len(str(self._len)) + 1
 
735
            + prefix_len + 1
 
736
            + bytes_for_items)
 
737
 
 
738
    @classmethod
 
739
    def deserialise(klass, bytes, key, search_key_func=None):
 
740
        """Deserialise bytes, with key key, into a LeafNode.
 
741
 
 
742
        :param bytes: The bytes of the node.
 
743
        :param key: The key that the serialised node has.
 
744
        """
 
745
        key = static_tuple.expect_static_tuple(key)
 
746
        return _deserialise_leaf_node(bytes, key,
 
747
                                      search_key_func=search_key_func)
 
748
 
 
749
    def iteritems(self, store, key_filter=None):
 
750
        """Iterate over items in the node.
 
751
 
 
752
        :param key_filter: A filter to apply to the node. It should be a
 
753
            list/set/dict or similar repeatedly iterable container.
 
754
        """
 
755
        if key_filter is not None:
 
756
            # Adjust the filter - short elements go to a prefix filter. All
 
757
            # other items are looked up directly.
 
758
            # XXX: perhaps defaultdict? Profiling<rinse and repeat>
 
759
            filters = {}
 
760
            for key in key_filter:
 
761
                if len(key) == self._key_width:
 
762
                    # This filter is meant to match exactly one key, yield it
 
763
                    # if we have it.
 
764
                    try:
 
765
                        yield key, self._items[key]
 
766
                    except KeyError:
 
767
                        # This key is not present in this map, continue
 
768
                        pass
 
769
                else:
 
770
                    # Short items, we need to match based on a prefix
 
771
                    length_filter = filters.setdefault(len(key), set())
 
772
                    length_filter.add(key)
 
773
            if filters:
 
774
                filters = filters.items()
 
775
                for item in self._items.iteritems():
 
776
                    for length, length_filter in filters:
 
777
                        if item[0][:length] in length_filter:
 
778
                            yield item
 
779
                            break
 
780
        else:
 
781
            for item in self._items.iteritems():
 
782
                yield item
 
783
 
 
784
    def _key_value_len(self, key, value):
 
785
        # TODO: Should probably be done without actually joining the key, but
 
786
        #       then that can be done via the C extension
 
787
        return (len(self._serialise_key(key)) + 1
 
788
                + len(str(value.count('\n'))) + 1
 
789
                + len(value) + 1)
 
790
 
 
791
    def _search_key(self, key):
 
792
        return self._search_key_func(key)
 
793
 
 
794
    def _map_no_split(self, key, value):
 
795
        """Map a key to a value.
 
796
 
 
797
        This assumes either the key does not already exist, or you have already
 
798
        removed its size and length from self.
 
799
 
 
800
        :return: True if adding this node should cause us to split.
 
801
        """
 
802
        self._items[key] = value
 
803
        self._raw_size += self._key_value_len(key, value)
 
804
        self._len += 1
 
805
        serialised_key = self._serialise_key(key)
 
806
        if self._common_serialised_prefix is None:
 
807
            self._common_serialised_prefix = serialised_key
 
808
        else:
 
809
            self._common_serialised_prefix = self.common_prefix(
 
810
                self._common_serialised_prefix, serialised_key)
 
811
        search_key = self._search_key(key)
 
812
        if self._search_prefix is _unknown:
 
813
            self._compute_search_prefix()
 
814
        if self._search_prefix is None:
 
815
            self._search_prefix = search_key
 
816
        else:
 
817
            self._search_prefix = self.common_prefix(
 
818
                self._search_prefix, search_key)
 
819
        if (self._len > 1
 
820
            and self._maximum_size
 
821
            and self._current_size() > self._maximum_size):
 
822
            # Check to see if all of the search_keys for this node are
 
823
            # identical. We allow the node to grow under that circumstance
 
824
            # (we could track this as common state, but it is infrequent)
 
825
            if (search_key != self._search_prefix
 
826
                or not self._are_search_keys_identical()):
 
827
                return True
 
828
        return False
 
829
 
 
830
    def _split(self, store):
 
831
        """We have overflowed.
 
832
 
 
833
        Split this node into multiple LeafNodes, return it up the stack so that
 
834
        the next layer creates a new InternalNode and references the new nodes.
 
835
 
 
836
        :return: (common_serialised_prefix, [(node_serialised_prefix, node)])
 
837
        """
 
838
        if self._search_prefix is _unknown:
 
839
            raise AssertionError('Search prefix must be known')
 
840
        common_prefix = self._search_prefix
 
841
        split_at = len(common_prefix) + 1
 
842
        result = {}
 
843
        for key, value in self._items.iteritems():
 
844
            search_key = self._search_key(key)
 
845
            prefix = search_key[:split_at]
 
846
            # TODO: Generally only 1 key can be exactly the right length,
 
847
            #       which means we can only have 1 key in the node pointed
 
848
            #       at by the 'prefix\0' key. We might want to consider
 
849
            #       folding it into the containing InternalNode rather than
 
850
            #       having a fixed length-1 node.
 
851
            #       Note this is probably not true for hash keys, as they
 
852
            #       may get a '\00' node anywhere, but won't have keys of
 
853
            #       different lengths.
 
854
            if len(prefix) < split_at:
 
855
                prefix += '\x00'*(split_at - len(prefix))
 
856
            if prefix not in result:
 
857
                node = LeafNode(search_key_func=self._search_key_func)
 
858
                node.set_maximum_size(self._maximum_size)
 
859
                node._key_width = self._key_width
 
860
                result[prefix] = node
 
861
            else:
 
862
                node = result[prefix]
 
863
            sub_prefix, node_details = node.map(store, key, value)
 
864
            if len(node_details) > 1:
 
865
                if prefix != sub_prefix:
 
866
                    # This node has been split and is now found via a different
 
867
                    # path
 
868
                    result.pop(prefix)
 
869
                new_node = InternalNode(sub_prefix,
 
870
                    search_key_func=self._search_key_func)
 
871
                new_node.set_maximum_size(self._maximum_size)
 
872
                new_node._key_width = self._key_width
 
873
                for split, node in node_details:
 
874
                    new_node.add_node(split, node)
 
875
                result[prefix] = new_node
 
876
        return common_prefix, result.items()
 
877
 
 
878
    def map(self, store, key, value):
 
879
        """Map key to value."""
 
880
        if key in self._items:
 
881
            self._raw_size -= self._key_value_len(key, self._items[key])
 
882
            self._len -= 1
 
883
        self._key = None
 
884
        if self._map_no_split(key, value):
 
885
            return self._split(store)
 
886
        else:
 
887
            if self._search_prefix is _unknown:
 
888
                raise AssertionError('%r must be known' % self._search_prefix)
 
889
            return self._search_prefix, [("", self)]
 
890
 
 
891
    _serialise_key = '\x00'.join
 
892
 
 
893
    def serialise(self, store):
 
894
        """Serialise the LeafNode to store.
 
895
 
 
896
        :param store: A VersionedFiles honouring the CHK extensions.
 
897
        :return: An iterable of the keys inserted by this operation.
 
898
        """
 
899
        lines = ["chkleaf:\n"]
 
900
        lines.append("%d\n" % self._maximum_size)
 
901
        lines.append("%d\n" % self._key_width)
 
902
        lines.append("%d\n" % self._len)
 
903
        if self._common_serialised_prefix is None:
 
904
            lines.append('\n')
 
905
            if len(self._items) != 0:
 
906
                raise AssertionError('If _common_serialised_prefix is None'
 
907
                    ' we should have no items')
 
908
        else:
 
909
            lines.append('%s\n' % (self._common_serialised_prefix,))
 
910
            prefix_len = len(self._common_serialised_prefix)
 
911
        for key, value in sorted(self._items.items()):
 
912
            # Always add a final newline
 
913
            value_lines = osutils.chunks_to_lines([value + '\n'])
 
914
            serialized = "%s\x00%s\n" % (self._serialise_key(key),
 
915
                                         len(value_lines))
 
916
            if not serialized.startswith(self._common_serialised_prefix):
 
917
                raise AssertionError('We thought the common prefix was %r'
 
918
                    ' but entry %r does not have it in common'
 
919
                    % (self._common_serialised_prefix, serialized))
 
920
            lines.append(serialized[prefix_len:])
 
921
            lines.extend(value_lines)
 
922
        sha1, _, _ = store.add_lines((None,), (), lines)
 
923
        self._key = StaticTuple("sha1:" + sha1,).intern()
 
924
        bytes = ''.join(lines)
 
925
        if len(bytes) != self._current_size():
 
926
            raise AssertionError('Invalid _current_size')
 
927
        _get_cache()[self._key] = bytes
 
928
        return [self._key]
 
929
 
 
930
    def refs(self):
 
931
        """Return the references to other CHK's held by this node."""
 
932
        return []
 
933
 
 
934
    def _compute_search_prefix(self):
 
935
        """Determine the common search prefix for all keys in this node.
 
936
 
 
937
        :return: A bytestring of the longest search key prefix that is
 
938
            unique within this node.
 
939
        """
 
940
        search_keys = [self._search_key_func(key) for key in self._items]
 
941
        self._search_prefix = self.common_prefix_for_keys(search_keys)
 
942
        return self._search_prefix
 
943
 
 
944
    def _are_search_keys_identical(self):
 
945
        """Check to see if the search keys for all entries are the same.
 
946
 
 
947
        When using a hash as the search_key it is possible for non-identical
 
948
        keys to collide. If that happens enough, we may try overflow a
 
949
        LeafNode, but as all are collisions, we must not split.
 
950
        """
 
951
        common_search_key = None
 
952
        for key in self._items:
 
953
            search_key = self._search_key(key)
 
954
            if common_search_key is None:
 
955
                common_search_key = search_key
 
956
            elif search_key != common_search_key:
 
957
                return False
 
958
        return True
 
959
 
 
960
    def _compute_serialised_prefix(self):
 
961
        """Determine the common prefix for serialised keys in this node.
 
962
 
 
963
        :return: A bytestring of the longest serialised key prefix that is
 
964
            unique within this node.
 
965
        """
 
966
        serialised_keys = [self._serialise_key(key) for key in self._items]
 
967
        self._common_serialised_prefix = self.common_prefix_for_keys(
 
968
            serialised_keys)
 
969
        return self._common_serialised_prefix
 
970
 
 
971
    def unmap(self, store, key):
 
972
        """Unmap key from the node."""
 
973
        try:
 
974
            self._raw_size -= self._key_value_len(key, self._items[key])
 
975
        except KeyError:
 
976
            trace.mutter("key %s not found in %r", key, self._items)
 
977
            raise
 
978
        self._len -= 1
 
979
        del self._items[key]
 
980
        self._key = None
 
981
        # Recompute from scratch
 
982
        self._compute_search_prefix()
 
983
        self._compute_serialised_prefix()
 
984
        return self
 
985
 
 
986
 
 
987
class InternalNode(Node):
 
988
    """A node that contains references to other nodes.
 
989
 
 
990
    An InternalNode is responsible for mapping search key prefixes to child
 
991
    nodes.
 
992
 
 
993
    :ivar _items: serialised_key => node dictionary. node may be a tuple,
 
994
        LeafNode or InternalNode.
 
995
    """
 
996
 
 
997
    __slots__ = ('_node_width',)
 
998
 
 
999
    def __init__(self, prefix='', search_key_func=None):
 
1000
        Node.__init__(self)
 
1001
        # The size of an internalnode with default values and no children.
 
1002
        # How many octets key prefixes within this node are.
 
1003
        self._node_width = 0
 
1004
        self._search_prefix = prefix
 
1005
        if search_key_func is None:
 
1006
            self._search_key_func = _search_key_plain
 
1007
        else:
 
1008
            self._search_key_func = search_key_func
 
1009
 
 
1010
    def add_node(self, prefix, node):
 
1011
        """Add a child node with prefix prefix, and node node.
 
1012
 
 
1013
        :param prefix: The search key prefix for node.
 
1014
        :param node: The node being added.
 
1015
        """
 
1016
        if self._search_prefix is None:
 
1017
            raise AssertionError("_search_prefix should not be None")
 
1018
        if not prefix.startswith(self._search_prefix):
 
1019
            raise AssertionError("prefixes mismatch: %s must start with %s"
 
1020
                % (prefix,self._search_prefix))
 
1021
        if len(prefix) != len(self._search_prefix) + 1:
 
1022
            raise AssertionError("prefix wrong length: len(%s) is not %d" %
 
1023
                (prefix, len(self._search_prefix) + 1))
 
1024
        self._len += len(node)
 
1025
        if not len(self._items):
 
1026
            self._node_width = len(prefix)
 
1027
        if self._node_width != len(self._search_prefix) + 1:
 
1028
            raise AssertionError("node width mismatch: %d is not %d" %
 
1029
                (self._node_width, len(self._search_prefix) + 1))
 
1030
        self._items[prefix] = node
 
1031
        self._key = None
 
1032
 
 
1033
    def _current_size(self):
 
1034
        """Answer the current serialised size of this node."""
 
1035
        return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
 
1036
            len(str(self._maximum_size)))
 
1037
 
 
1038
    @classmethod
 
1039
    def deserialise(klass, bytes, key, search_key_func=None):
 
1040
        """Deserialise bytes to an InternalNode, with key key.
 
1041
 
 
1042
        :param bytes: The bytes of the node.
 
1043
        :param key: The key that the serialised node has.
 
1044
        :return: An InternalNode instance.
 
1045
        """
 
1046
        key = static_tuple.expect_static_tuple(key)
 
1047
        return _deserialise_internal_node(bytes, key,
 
1048
                                          search_key_func=search_key_func)
 
1049
 
 
1050
    def iteritems(self, store, key_filter=None):
 
1051
        for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
 
1052
            for item in node.iteritems(store, key_filter=node_filter):
 
1053
                yield item
 
1054
 
 
1055
    def _iter_nodes(self, store, key_filter=None, batch_size=None):
 
1056
        """Iterate over node objects which match key_filter.
 
1057
 
 
1058
        :param store: A store to use for accessing content.
 
1059
        :param key_filter: A key filter to filter nodes. Only nodes that might
 
1060
            contain a key in key_filter will be returned.
 
1061
        :param batch_size: If not None, then we will return the nodes that had
 
1062
            to be read using get_record_stream in batches, rather than reading
 
1063
            them all at once.
 
1064
        :return: An iterable of nodes. This function does not have to be fully
 
1065
            consumed.  (There will be no pending I/O when items are being returned.)
 
1066
        """
 
1067
        # Map from chk key ('sha1:...',) to (prefix, key_filter)
 
1068
        # prefix is the key in self._items to use, key_filter is the key_filter
 
1069
        # entries that would match this node
 
1070
        keys = {}
 
1071
        shortcut = False
 
1072
        if key_filter is None:
 
1073
            # yielding all nodes, yield whatever we have, and queue up a read
 
1074
            # for whatever we are missing
 
1075
            shortcut = True
 
1076
            for prefix, node in self._items.iteritems():
 
1077
                if node.__class__ is StaticTuple:
 
1078
                    keys[node] = (prefix, None)
 
1079
                else:
 
1080
                    yield node, None
 
1081
        elif len(key_filter) == 1:
 
1082
            # Technically, this path could also be handled by the first check
 
1083
            # in 'self._node_width' in length_filters. However, we can handle
 
1084
            # this case without spending any time building up the
 
1085
            # prefix_to_keys, etc state.
 
1086
 
 
1087
            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
 
1088
            # 0.626us   list(key_filter)[0]
 
1089
            #       is a func() for list(), 2 mallocs, and a getitem
 
1090
            # 0.489us   [k for k in key_filter][0]
 
1091
            #       still has the mallocs, avoids the func() call
 
1092
            # 0.350us   iter(key_filter).next()
 
1093
            #       has a func() call, and mallocs an iterator
 
1094
            # 0.125us   for key in key_filter: pass
 
1095
            #       no func() overhead, might malloc an iterator
 
1096
            # 0.105us   for key in key_filter: break
 
1097
            #       no func() overhead, might malloc an iterator, probably
 
1098
            #       avoids checking an 'else' clause as part of the for
 
1099
            for key in key_filter:
 
1100
                break
 
1101
            search_prefix = self._search_prefix_filter(key)
 
1102
            if len(search_prefix) == self._node_width:
 
1103
                # This item will match exactly, so just do a dict lookup, and
 
1104
                # see what we can return
 
1105
                shortcut = True
 
1106
                try:
 
1107
                    node = self._items[search_prefix]
 
1108
                except KeyError:
 
1109
                    # A given key can only match 1 child node, if it isn't
 
1110
                    # there, then we can just return nothing
 
1111
                    return
 
1112
                if node.__class__ is StaticTuple:
 
1113
                    keys[node] = (search_prefix, [key])
 
1114
                else:
 
1115
                    # This is loaded, and the only thing that can match,
 
1116
                    # return
 
1117
                    yield node, [key]
 
1118
                    return
 
1119
        if not shortcut:
 
1120
            # First, convert all keys into a list of search prefixes
 
1121
            # Aggregate common prefixes, and track the keys they come from
 
1122
            prefix_to_keys = {}
 
1123
            length_filters = {}
 
1124
            for key in key_filter:
 
1125
                search_prefix = self._search_prefix_filter(key)
 
1126
                length_filter = length_filters.setdefault(
 
1127
                                    len(search_prefix), set())
 
1128
                length_filter.add(search_prefix)
 
1129
                prefix_to_keys.setdefault(search_prefix, []).append(key)
 
1130
 
 
1131
            if (self._node_width in length_filters
 
1132
                and len(length_filters) == 1):
 
1133
                # all of the search prefixes match exactly _node_width. This
 
1134
                # means that everything is an exact match, and we can do a
 
1135
                # lookup into self._items, rather than iterating over the items
 
1136
                # dict.
 
1137
                search_prefixes = length_filters[self._node_width]
 
1138
                for search_prefix in search_prefixes:
 
1139
                    try:
 
1140
                        node = self._items[search_prefix]
 
1141
                    except KeyError:
 
1142
                        # We can ignore this one
 
1143
                        continue
 
1144
                    node_key_filter = prefix_to_keys[search_prefix]
 
1145
                    if node.__class__ is StaticTuple:
 
1146
                        keys[node] = (search_prefix, node_key_filter)
 
1147
                    else:
 
1148
                        yield node, node_key_filter
 
1149
            else:
 
1150
                # The slow way. We walk every item in self._items, and check to
 
1151
                # see if there are any matches
 
1152
                length_filters = length_filters.items()
 
1153
                for prefix, node in self._items.iteritems():
 
1154
                    node_key_filter = []
 
1155
                    for length, length_filter in length_filters:
 
1156
                        sub_prefix = prefix[:length]
 
1157
                        if sub_prefix in length_filter:
 
1158
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
 
1159
                    if node_key_filter: # this key matched something, yield it
 
1160
                        if node.__class__ is StaticTuple:
 
1161
                            keys[node] = (prefix, node_key_filter)
 
1162
                        else:
 
1163
                            yield node, node_key_filter
 
1164
        if keys:
 
1165
            # Look in the page cache for some more bytes
 
1166
            found_keys = set()
 
1167
            for key in keys:
 
1168
                try:
 
1169
                    bytes = _get_cache()[key]
 
1170
                except KeyError:
 
1171
                    continue
 
1172
                else:
 
1173
                    node = _deserialise(bytes, key,
 
1174
                        search_key_func=self._search_key_func)
 
1175
                    prefix, node_key_filter = keys[key]
 
1176
                    self._items[prefix] = node
 
1177
                    found_keys.add(key)
 
1178
                    yield node, node_key_filter
 
1179
            for key in found_keys:
 
1180
                del keys[key]
 
1181
        if keys:
 
1182
            # demand load some pages.
 
1183
            if batch_size is None:
 
1184
                # Read all the keys in
 
1185
                batch_size = len(keys)
 
1186
            key_order = list(keys)
 
1187
            for batch_start in range(0, len(key_order), batch_size):
 
1188
                batch = key_order[batch_start:batch_start + batch_size]
 
1189
                # We have to fully consume the stream so there is no pending
 
1190
                # I/O, so we buffer the nodes for now.
 
1191
                stream = store.get_record_stream(batch, 'unordered', True)
 
1192
                node_and_filters = []
 
1193
                for record in stream:
 
1194
                    bytes = record.get_bytes_as('fulltext')
 
1195
                    node = _deserialise(bytes, record.key,
 
1196
                        search_key_func=self._search_key_func)
 
1197
                    prefix, node_key_filter = keys[record.key]
 
1198
                    node_and_filters.append((node, node_key_filter))
 
1199
                    self._items[prefix] = node
 
1200
                    _get_cache()[record.key] = bytes
 
1201
                for info in node_and_filters:
 
1202
                    yield info
 
1203
 
 
1204
    def map(self, store, key, value):
 
1205
        """Map key to value."""
 
1206
        if not len(self._items):
 
1207
            raise AssertionError("can't map in an empty InternalNode.")
 
1208
        search_key = self._search_key(key)
 
1209
        if self._node_width != len(self._search_prefix) + 1:
 
1210
            raise AssertionError("node width mismatch: %d is not %d" %
 
1211
                (self._node_width, len(self._search_prefix) + 1))
 
1212
        if not search_key.startswith(self._search_prefix):
 
1213
            # This key doesn't fit in this index, so we need to split at the
 
1214
            # point where it would fit, insert self into that internal node,
 
1215
            # and then map this key into that node.
 
1216
            new_prefix = self.common_prefix(self._search_prefix,
 
1217
                                            search_key)
 
1218
            new_parent = InternalNode(new_prefix,
 
1219
                search_key_func=self._search_key_func)
 
1220
            new_parent.set_maximum_size(self._maximum_size)
 
1221
            new_parent._key_width = self._key_width
 
1222
            new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
 
1223
                                self)
 
1224
            return new_parent.map(store, key, value)
 
1225
        children = [node for node, _
 
1226
                          in self._iter_nodes(store, key_filter=[key])]
 
1227
        if children:
 
1228
            child = children[0]
 
1229
        else:
 
1230
            # new child needed:
 
1231
            child = self._new_child(search_key, LeafNode)
 
1232
        old_len = len(child)
 
1233
        if type(child) is LeafNode:
 
1234
            old_size = child._current_size()
 
1235
        else:
 
1236
            old_size = None
 
1237
        prefix, node_details = child.map(store, key, value)
 
1238
        if len(node_details) == 1:
 
1239
            # child may have shrunk, or might be a new node
 
1240
            child = node_details[0][1]
 
1241
            self._len = self._len - old_len + len(child)
 
1242
            self._items[search_key] = child
 
1243
            self._key = None
 
1244
            new_node = self
 
1245
            if type(child) is LeafNode:
 
1246
                if old_size is None:
 
1247
                    # The old node was an InternalNode which means it has now
 
1248
                    # collapsed, so we need to check if it will chain to a
 
1249
                    # collapse at this level.
 
1250
                    trace.mutter("checking remap as InternalNode -> LeafNode")
 
1251
                    new_node = self._check_remap(store)
 
1252
                else:
 
1253
                    # If the LeafNode has shrunk in size, we may want to run
 
1254
                    # a remap check. Checking for a remap is expensive though
 
1255
                    # and the frequency of a successful remap is very low.
 
1256
                    # Shrinkage by small amounts is common, so we only do the
 
1257
                    # remap check if the new_size is low or the shrinkage
 
1258
                    # amount is over a configurable limit.
 
1259
                    new_size = child._current_size()
 
1260
                    shrinkage = old_size - new_size
 
1261
                    if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
 
1262
                        or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
 
1263
                        trace.mutter(
 
1264
                            "checking remap as size shrunk by %d to be %d",
 
1265
                            shrinkage, new_size)
 
1266
                        new_node = self._check_remap(store)
 
1267
            if new_node._search_prefix is None:
 
1268
                raise AssertionError("_search_prefix should not be None")
 
1269
            return new_node._search_prefix, [('', new_node)]
 
1270
        # child has overflown - create a new intermediate node.
 
1271
        # XXX: This is where we might want to try and expand our depth
 
1272
        # to refer to more bytes of every child (which would give us
 
1273
        # multiple pointers to child nodes, but less intermediate nodes)
 
1274
        child = self._new_child(search_key, InternalNode)
 
1275
        child._search_prefix = prefix
 
1276
        for split, node in node_details:
 
1277
            child.add_node(split, node)
 
1278
        self._len = self._len - old_len + len(child)
 
1279
        self._key = None
 
1280
        return self._search_prefix, [("", self)]
 
1281
 
 
1282
    def _new_child(self, search_key, klass):
 
1283
        """Create a new child node of type klass."""
 
1284
        child = klass()
 
1285
        child.set_maximum_size(self._maximum_size)
 
1286
        child._key_width = self._key_width
 
1287
        child._search_key_func = self._search_key_func
 
1288
        self._items[search_key] = child
 
1289
        return child
 
1290
 
 
1291
    def serialise(self, store):
 
1292
        """Serialise the node to store.
 
1293
 
 
1294
        :param store: A VersionedFiles honouring the CHK extensions.
 
1295
        :return: An iterable of the keys inserted by this operation.
 
1296
        """
 
1297
        for node in self._items.itervalues():
 
1298
            if type(node) is StaticTuple:
 
1299
                # Never deserialised.
 
1300
                continue
 
1301
            if node._key is not None:
 
1302
                # Never altered
 
1303
                continue
 
1304
            for key in node.serialise(store):
 
1305
                yield key
 
1306
        lines = ["chknode:\n"]
 
1307
        lines.append("%d\n" % self._maximum_size)
 
1308
        lines.append("%d\n" % self._key_width)
 
1309
        lines.append("%d\n" % self._len)
 
1310
        if self._search_prefix is None:
 
1311
            raise AssertionError("_search_prefix should not be None")
 
1312
        lines.append('%s\n' % (self._search_prefix,))
 
1313
        prefix_len = len(self._search_prefix)
 
1314
        for prefix, node in sorted(self._items.items()):
 
1315
            if type(node) is StaticTuple:
 
1316
                key = node[0]
 
1317
            else:
 
1318
                key = node._key[0]
 
1319
            serialised = "%s\x00%s\n" % (prefix, key)
 
1320
            if not serialised.startswith(self._search_prefix):
 
1321
                raise AssertionError("prefixes mismatch: %s must start with %s"
 
1322
                    % (serialised, self._search_prefix))
 
1323
            lines.append(serialised[prefix_len:])
 
1324
        sha1, _, _ = store.add_lines((None,), (), lines)
 
1325
        self._key = StaticTuple("sha1:" + sha1,).intern()
 
1326
        _get_cache()[self._key] = ''.join(lines)
 
1327
        yield self._key
 
1328
 
 
1329
    def _search_key(self, key):
 
1330
        """Return the serialised key for key in this node."""
 
1331
        # search keys are fixed width. All will be self._node_width wide, so we
 
1332
        # pad as necessary.
 
1333
        return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
 
1334
 
 
1335
    def _search_prefix_filter(self, key):
 
1336
        """Serialise key for use as a prefix filter in iteritems."""
 
1337
        return self._search_key_func(key)[:self._node_width]
 
1338
 
 
1339
    def _split(self, offset):
 
1340
        """Split this node into smaller nodes starting at offset.
 
1341
 
 
1342
        :param offset: The offset to start the new child nodes at.
 
1343
        :return: An iterable of (prefix, node) tuples. prefix is a byte
 
1344
            prefix for reaching node.
 
1345
        """
 
1346
        if offset >= self._node_width:
 
1347
            for node in self._items.values():
 
1348
                for result in node._split(offset):
 
1349
                    yield result
 
1350
            return
 
1351
        for key, node in self._items.items():
 
1352
            pass
 
1353
 
 
1354
    def refs(self):
 
1355
        """Return the references to other CHK's held by this node."""
 
1356
        if self._key is None:
 
1357
            raise AssertionError("unserialised nodes have no refs.")
 
1358
        refs = []
 
1359
        for value in self._items.itervalues():
 
1360
            if type(value) is StaticTuple:
 
1361
                refs.append(value)
 
1362
            else:
 
1363
                refs.append(value.key())
 
1364
        return refs
 
1365
 
 
1366
    def _compute_search_prefix(self, extra_key=None):
 
1367
        """Return the unique key prefix for this node.
 
1368
 
 
1369
        :return: A bytestring of the longest search key prefix that is
 
1370
            unique within this node.
 
1371
        """
 
1372
        self._search_prefix = self.common_prefix_for_keys(self._items)
 
1373
        return self._search_prefix
 
1374
 
 
1375
    def unmap(self, store, key, check_remap=True):
 
1376
        """Remove key from this node and its children."""
 
1377
        if not len(self._items):
 
1378
            raise AssertionError("can't unmap in an empty InternalNode.")
 
1379
        children = [node for node, _
 
1380
                          in self._iter_nodes(store, key_filter=[key])]
 
1381
        if children:
 
1382
            child = children[0]
 
1383
        else:
 
1384
            raise KeyError(key)
 
1385
        self._len -= 1
 
1386
        unmapped = child.unmap(store, key)
 
1387
        self._key = None
 
1388
        search_key = self._search_key(key)
 
1389
        if len(unmapped) == 0:
 
1390
            # All child nodes are gone, remove the child:
 
1391
            del self._items[search_key]
 
1392
            unmapped = None
 
1393
        else:
 
1394
            # Stash the returned node
 
1395
            self._items[search_key] = unmapped
 
1396
        if len(self._items) == 1:
 
1397
            # this node is no longer needed:
 
1398
            return self._items.values()[0]
 
1399
        if type(unmapped) is InternalNode:
 
1400
            return self
 
1401
        if check_remap:
 
1402
            return self._check_remap(store)
 
1403
        else:
 
1404
            return self
 
1405
 
 
1406
    def _check_remap(self, store):
 
1407
        """Check if all keys contained by children fit in a single LeafNode.
 
1408
 
 
1409
        :param store: A store to use for reading more nodes
 
1410
        :return: Either self, or a new LeafNode which should replace self.
 
1411
        """
 
1412
        # Logic for how we determine when we need to rebuild
 
1413
        # 1) Implicitly unmap() is removing a key which means that the child
 
1414
        #    nodes are going to be shrinking by some extent.
 
1415
        # 2) If all children are LeafNodes, it is possible that they could be
 
1416
        #    combined into a single LeafNode, which can then completely replace
 
1417
        #    this internal node with a single LeafNode
 
1418
        # 3) If *one* child is an InternalNode, we assume it has already done
 
1419
        #    all the work to determine that its children cannot collapse, and
 
1420
        #    we can then assume that those nodes *plus* the current nodes don't
 
1421
        #    have a chance of collapsing either.
 
1422
        #    So a very cheap check is to just say if 'unmapped' is an
 
1423
        #    InternalNode, we don't have to check further.
 
1424
 
 
1425
        # TODO: Another alternative is to check the total size of all known
 
1426
        #       LeafNodes. If there is some formula we can use to determine the
 
1427
        #       final size without actually having to read in any more
 
1428
        #       children, it would be nice to have. However, we have to be
 
1429
        #       careful with stuff like nodes that pull out the common prefix
 
1430
        #       of each key, as adding a new key can change the common prefix
 
1431
        #       and cause size changes greater than the length of one key.
 
1432
        #       So for now, we just add everything to a new Leaf until it
 
1433
        #       splits, as we know that will give the right answer
 
1434
        new_leaf = LeafNode(search_key_func=self._search_key_func)
 
1435
        new_leaf.set_maximum_size(self._maximum_size)
 
1436
        new_leaf._key_width = self._key_width
 
1437
        # A batch_size of 16 was chosen because:
 
1438
        #   a) In testing, a 4k page held 14 times. So if we have more than 16
 
1439
        #      leaf nodes we are unlikely to hold them in a single new leaf
 
1440
        #      node. This still allows for 1 round trip
 
1441
        #   b) With 16-way fan out, we can still do a single round trip
 
1442
        #   c) With 255-way fan out, we don't want to read all 255 and destroy
 
1443
        #      the page cache, just to determine that we really don't need it.
 
1444
        for node, _ in self._iter_nodes(store, batch_size=16):
 
1445
            if type(node) is InternalNode:
 
1446
                # Without looking at any leaf nodes, we are sure
 
1447
                return self
 
1448
            for key, value in node._items.iteritems():
 
1449
                if new_leaf._map_no_split(key, value):
 
1450
                    return self
 
1451
        trace.mutter("remap generated a new LeafNode")
 
1452
        return new_leaf
 
1453
 
 
1454
 
 
1455
def _deserialise(bytes, key, search_key_func):
 
1456
    """Helper for repositorydetails - convert bytes to a node."""
 
1457
    if bytes.startswith("chkleaf:\n"):
 
1458
        node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
 
1459
    elif bytes.startswith("chknode:\n"):
 
1460
        node = InternalNode.deserialise(bytes, key,
 
1461
            search_key_func=search_key_func)
 
1462
    else:
 
1463
        raise AssertionError("Unknown node type.")
 
1464
    return node
 
1465
 
 
1466
 
 
1467
class CHKMapDifference(object):
 
1468
    """Iterate the stored pages and key,value pairs for (new - old).
 
1469
 
 
1470
    This class provides a generator over the stored CHK pages and the
 
1471
    (key, value) pairs that are in any of the new maps and not in any of the
 
1472
    old maps.
 
1473
 
 
1474
    Note that it may yield chk pages that are common (especially root nodes),
 
1475
    but it won't yield (key,value) pairs that are common.
 
1476
    """
 
1477
 
 
1478
    def __init__(self, store, new_root_keys, old_root_keys,
 
1479
                 search_key_func, pb=None):
 
1480
        # TODO: Should we add a StaticTuple barrier here? It would be nice to
 
1481
        #       force callers to use StaticTuple, because there will often be
 
1482
        #       lots of keys passed in here. And even if we cast it locally,
 
1483
        #       that just meanst that we will have *both* a StaticTuple and a
 
1484
        #       tuple() in memory, referring to the same object. (so a net
 
1485
        #       increase in memory, not a decrease.)
 
1486
        self._store = store
 
1487
        self._new_root_keys = new_root_keys
 
1488
        self._old_root_keys = old_root_keys
 
1489
        self._pb = pb
 
1490
        # All uninteresting chks that we have seen. By the time they are added
 
1491
        # here, they should be either fully ignored, or queued up for
 
1492
        # processing
 
1493
        # TODO: This might grow to a large size if there are lots of merge
 
1494
        #       parents, etc. However, it probably doesn't scale to O(history)
 
1495
        #       like _processed_new_refs does.
 
1496
        self._all_old_chks = set(self._old_root_keys)
 
1497
        # All items that we have seen from the old_root_keys
 
1498
        self._all_old_items = set()
 
1499
        # These are interesting items which were either read, or already in the
 
1500
        # interesting queue (so we don't need to walk them again)
 
1501
        # TODO: processed_new_refs becomes O(all_chks), consider switching to
 
1502
        #       SimpleSet here.
 
1503
        self._processed_new_refs = set()
 
1504
        self._search_key_func = search_key_func
 
1505
 
 
1506
        # The uninteresting and interesting nodes to be searched
 
1507
        self._old_queue = []
 
1508
        self._new_queue = []
 
1509
        # Holds the (key, value) items found when processing the root nodes,
 
1510
        # waiting for the uninteresting nodes to be walked
 
1511
        self._new_item_queue = []
 
1512
        self._state = None
 
1513
 
 
1514
    def _read_nodes_from_store(self, keys):
 
1515
        # We chose not to use _get_cache(), because we think in
 
1516
        # terms of records to be yielded. Also, we expect to touch each page
 
1517
        # only 1 time during this code. (We may want to evaluate saving the
 
1518
        # raw bytes into the page cache, which would allow a working tree
 
1519
        # update after the fetch to not have to read the bytes again.)
 
1520
        as_st = StaticTuple.from_sequence
 
1521
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1522
        for record in stream:
 
1523
            if self._pb is not None:
 
1524
                self._pb.tick()
 
1525
            if record.storage_kind == 'absent':
 
1526
                raise errors.NoSuchRevision(self._store, record.key)
 
1527
            bytes = record.get_bytes_as('fulltext')
 
1528
            node = _deserialise(bytes, record.key,
 
1529
                                search_key_func=self._search_key_func)
 
1530
            if type(node) is InternalNode:
 
1531
                # Note we don't have to do node.refs() because we know that
 
1532
                # there are no children that have been pushed into this node
 
1533
                # Note: Using as_st() here seemed to save 1.2MB, which would
 
1534
                #       indicate that we keep 100k prefix_refs around while
 
1535
                #       processing. They *should* be shorter lived than that...
 
1536
                #       It does cost us ~10s of processing time
 
1537
                #prefix_refs = [as_st(item) for item in node._items.iteritems()]
 
1538
                prefix_refs = node._items.items()
 
1539
                items = []
 
1540
            else:
 
1541
                prefix_refs = []
 
1542
                # Note: We don't use a StaticTuple here. Profiling showed a
 
1543
                #       minor memory improvement (0.8MB out of 335MB peak 0.2%)
 
1544
                #       But a significant slowdown (15s / 145s, or 10%)
 
1545
                items = node._items.items()
 
1546
            yield record, node, prefix_refs, items
 
1547
 
 
1548
    def _read_old_roots(self):
 
1549
        old_chks_to_enqueue = []
 
1550
        all_old_chks = self._all_old_chks
 
1551
        for record, node, prefix_refs, items in \
 
1552
                self._read_nodes_from_store(self._old_root_keys):
 
1553
            # Uninteresting node
 
1554
            prefix_refs = [p_r for p_r in prefix_refs
 
1555
                                if p_r[1] not in all_old_chks]
 
1556
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1557
            all_old_chks.update(new_refs)
 
1558
            # TODO: This might be a good time to turn items into StaticTuple
 
1559
            #       instances and possibly intern them. However, this does not
 
1560
            #       impact 'initial branch' performance, so I'm not worrying
 
1561
            #       about this yet
 
1562
            self._all_old_items.update(items)
 
1563
            # Queue up the uninteresting references
 
1564
            # Don't actually put them in the 'to-read' queue until we have
 
1565
            # finished checking the interesting references
 
1566
            old_chks_to_enqueue.extend(prefix_refs)
 
1567
        return old_chks_to_enqueue
 
1568
 
 
1569
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1570
        # At this point, we have read all the uninteresting and interesting
 
1571
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1572
        # handled the interesting ones
 
1573
        for prefix, ref in old_chks_to_enqueue:
 
1574
            not_interesting = True
 
1575
            for i in xrange(len(prefix), 0, -1):
 
1576
                if prefix[:i] in new_prefixes:
 
1577
                    not_interesting = False
 
1578
                    break
 
1579
            if not_interesting:
 
1580
                # This prefix is not part of the remaining 'interesting set'
 
1581
                continue
 
1582
            self._old_queue.append(ref)
 
1583
 
 
1584
    def _read_all_roots(self):
 
1585
        """Read the root pages.
 
1586
 
 
1587
        This is structured as a generator, so that the root records can be
 
1588
        yielded up to whoever needs them without any buffering.
 
1589
        """
 
1590
        # This is the bootstrap phase
 
1591
        if not self._old_root_keys:
 
1592
            # With no old_root_keys we can just shortcut and be ready
 
1593
            # for _flush_new_queue
 
1594
            self._new_queue = list(self._new_root_keys)
 
1595
            return
 
1596
        old_chks_to_enqueue = self._read_old_roots()
 
1597
        # filter out any root keys that are already known to be uninteresting
 
1598
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1599
        # These are prefixes that are present in new_keys that we are
 
1600
        # thinking to yield
 
1601
        new_prefixes = set()
 
1602
        # We are about to yield all of these, so we don't want them getting
 
1603
        # added a second time
 
1604
        processed_new_refs = self._processed_new_refs
 
1605
        processed_new_refs.update(new_keys)
 
1606
        for record, node, prefix_refs, items in \
 
1607
                self._read_nodes_from_store(new_keys):
 
1608
            # At this level, we now know all the uninteresting references
 
1609
            # So we filter and queue up whatever is remaining
 
1610
            prefix_refs = [p_r for p_r in prefix_refs
 
1611
                           if p_r[1] not in self._all_old_chks
 
1612
                              and p_r[1] not in processed_new_refs]
 
1613
            refs = [p_r[1] for p_r in prefix_refs]
 
1614
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1615
            self._new_queue.extend(refs)
 
1616
            # TODO: We can potentially get multiple items here, however the
 
1617
            #       current design allows for this, as callers will do the work
 
1618
            #       to make the results unique. We might profile whether we
 
1619
            #       gain anything by ensuring unique return values for items
 
1620
            # TODO: This might be a good time to cast to StaticTuple, as
 
1621
            #       self._new_item_queue will hold the contents of multiple
 
1622
            #       records for an extended lifetime
 
1623
            new_items = [item for item in items
 
1624
                               if item not in self._all_old_items]
 
1625
            self._new_item_queue.extend(new_items)
 
1626
            new_prefixes.update([self._search_key_func(item[0])
 
1627
                                 for item in new_items])
 
1628
            processed_new_refs.update(refs)
 
1629
            yield record
 
1630
        # For new_prefixes we have the full length prefixes queued up.
 
1631
        # However, we also need possible prefixes. (If we have a known ref to
 
1632
        # 'ab', then we also need to include 'a'.) So expand the
 
1633
        # new_prefixes to include all shorter prefixes
 
1634
        for prefix in list(new_prefixes):
 
1635
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
 
1636
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1637
 
 
1638
    def _flush_new_queue(self):
 
1639
        # No need to maintain the heap invariant anymore, just pull things out
 
1640
        # and process them
 
1641
        refs = set(self._new_queue)
 
1642
        self._new_queue = []
 
1643
        # First pass, flush all interesting items and convert to using direct refs
 
1644
        all_old_chks = self._all_old_chks
 
1645
        processed_new_refs = self._processed_new_refs
 
1646
        all_old_items = self._all_old_items
 
1647
        new_items = [item for item in self._new_item_queue
 
1648
                           if item not in all_old_items]
 
1649
        self._new_item_queue = []
 
1650
        if new_items:
 
1651
            yield None, new_items
 
1652
        refs = refs.difference(all_old_chks)
 
1653
        processed_new_refs.update(refs)
 
1654
        while refs:
 
1655
            # TODO: Using a SimpleSet for self._processed_new_refs and
 
1656
            #       saved as much as 10MB of peak memory. However, it requires
 
1657
            #       implementing a non-pyrex version.
 
1658
            next_refs = set()
 
1659
            next_refs_update = next_refs.update
 
1660
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1661
            # from 1m54s to 1m51s. Consider it.
 
1662
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1663
                if all_old_items:
 
1664
                    # using the 'if' check saves about 145s => 141s, when
 
1665
                    # streaming initial branch of Launchpad data.
 
1666
                    items = [item for item in items
 
1667
                             if item not in all_old_items]
 
1668
                yield record, items
 
1669
                next_refs_update([p_r[1] for p_r in p_refs])
 
1670
                del p_refs
 
1671
            # set1.difference(set/dict) walks all of set1, and checks if it
 
1672
            # exists in 'other'.
 
1673
            # set1.difference(iterable) walks all of iterable, and does a
 
1674
            # 'difference_update' on a clone of set1. Pick wisely based on the
 
1675
            # expected sizes of objects.
 
1676
            # in our case it is expected that 'new_refs' will always be quite
 
1677
            # small.
 
1678
            next_refs = next_refs.difference(all_old_chks)
 
1679
            next_refs = next_refs.difference(processed_new_refs)
 
1680
            processed_new_refs.update(next_refs)
 
1681
            refs = next_refs
 
1682
 
 
1683
    def _process_next_old(self):
 
1684
        # Since we don't filter uninteresting any further than during
 
1685
        # _read_all_roots, process the whole queue in a single pass.
 
1686
        refs = self._old_queue
 
1687
        self._old_queue = []
 
1688
        all_old_chks = self._all_old_chks
 
1689
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1690
            # TODO: Use StaticTuple here?
 
1691
            self._all_old_items.update(items)
 
1692
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1693
            self._old_queue.extend(refs)
 
1694
            all_old_chks.update(refs)
 
1695
 
 
1696
    def _process_queues(self):
 
1697
        while self._old_queue:
 
1698
            self._process_next_old()
 
1699
        return self._flush_new_queue()
 
1700
 
 
1701
    def process(self):
 
1702
        for record in self._read_all_roots():
 
1703
            yield record, []
 
1704
        for record, items in self._process_queues():
 
1705
            yield record, items
 
1706
 
 
1707
 
 
1708
def iter_interesting_nodes(store, interesting_root_keys,
 
1709
                           uninteresting_root_keys, pb=None):
 
1710
    """Given root keys, find interesting nodes.
 
1711
 
 
1712
    Evaluate nodes referenced by interesting_root_keys. Ones that are also
 
1713
    referenced from uninteresting_root_keys are not considered interesting.
 
1714
 
 
1715
    :param interesting_root_keys: keys which should be part of the
 
1716
        "interesting" nodes (which will be yielded)
 
1717
    :param uninteresting_root_keys: keys which should be filtered out of the
 
1718
        result set.
 
1719
    :return: Yield
 
1720
        (interesting record, {interesting key:values})
 
1721
    """
 
1722
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1723
                                uninteresting_root_keys,
 
1724
                                search_key_func=store._search_key_func,
 
1725
                                pb=pb)
 
1726
    return iterator.process()
 
1727
 
 
1728
 
 
1729
try:
 
1730
    from bzrlib._chk_map_pyx import (
 
1731
        _bytes_to_text_key,
 
1732
        _search_key_16,
 
1733
        _search_key_255,
 
1734
        _deserialise_leaf_node,
 
1735
        _deserialise_internal_node,
 
1736
        )
 
1737
except ImportError, e:
 
1738
    osutils.failed_to_load_extension(e)
 
1739
    from bzrlib._chk_map_py import (
 
1740
        _bytes_to_text_key,
 
1741
        _search_key_16,
 
1742
        _search_key_255,
 
1743
        _deserialise_leaf_node,
 
1744
        _deserialise_internal_node,
 
1745
        )
 
1746
search_key_registry.register('hash-16-way', _search_key_16)
 
1747
search_key_registry.register('hash-255-way', _search_key_255)
 
1748
 
 
1749
 
 
1750
def _check_key(key):
 
1751
    """Helper function to assert that a key is properly formatted.
 
1752
 
 
1753
    This generally shouldn't be used in production code, but it can be helpful
 
1754
    to debug problems.
 
1755
    """
 
1756
    if type(key) is not StaticTuple:
 
1757
        raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
 
1758
    if len(key) != 1:
 
1759
        raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
 
1760
    if type(key[0]) is not str:
 
1761
        raise TypeError('key %r should hold a str, not %r'
 
1762
                        % (key, type(key[0])))
 
1763
    if not key[0].startswith('sha1:'):
 
1764
        raise ValueError('key %r should point to a sha1:' % (key,))
 
1765
 
 
1766