1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
19
Overview and current status:
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
are all an additional 8-bits wide leading to a sparse upper tree.
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
allow optimisation of the update operation; but individual map/unmap calls are
29
possible and supported. Individual changes via map/unmap are buffered in memory
30
until the _save method is called to force serialisation of the tree.
31
apply_delta records its changes immediately by performing an implicit _save.
36
Densely packed upper nodes.
40
from __future__ import absolute_import
45
from . import lazy_import
46
lazy_import.lazy_import(globals(), """
63
from .static_tuple import StaticTuple
66
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
67
# out, it takes 3.1MB to cache the layer.
68
_PAGE_CACHE_SIZE = 4*1024*1024
69
# Per thread caches for 2 reasons:
70
# - in the server we may be serving very different content, so we get less
72
# - we avoid locking on every cache lookup.
73
_thread_caches = threading.local()
75
_thread_caches.page_cache = None
78
"""Get the per-thread page cache.
80
We need a function to do this because in a new thread the _thread_caches
81
threading.local object does not have the cache initialized yet.
83
page_cache = getattr(_thread_caches, 'page_cache', None)
84
if page_cache is None:
85
# We are caching bytes so len(value) is perfectly accurate
86
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
87
_thread_caches.page_cache = page_cache
95
# If a ChildNode falls below this many bytes, we check for a remap
96
_INTERESTING_NEW_SIZE = 50
97
# If a ChildNode shrinks by more than this amount, we check for a remap
98
_INTERESTING_SHRINKAGE_LIMIT = 20
101
def _search_key_plain(key):
102
"""Map the key tuple into a search string that just uses the key bytes."""
103
return '\x00'.join(key)
106
search_key_registry = registry.Registry()
107
search_key_registry.register('plain', _search_key_plain)
110
class CHKMap(object):
111
"""A persistent map from string to string backed by a CHK store."""
113
__slots__ = ('_store', '_root_node', '_search_key_func')
115
def __init__(self, store, root_key, search_key_func=None):
116
"""Create a CHKMap object.
118
:param store: The store the CHKMap is stored in.
119
:param root_key: The root key of the map. None to create an empty
121
:param search_key_func: A function mapping a key => bytes. These bytes
122
are then used by the internal nodes to split up leaf nodes into
126
if search_key_func is None:
127
search_key_func = _search_key_plain
128
self._search_key_func = search_key_func
130
self._root_node = LeafNode(search_key_func=search_key_func)
132
self._root_node = self._node_key(root_key)
134
def apply_delta(self, delta):
135
"""Apply a delta to the map.
137
:param delta: An iterable of old_key, new_key, new_value tuples.
138
If new_key is not None, then new_key->new_value is inserted
139
into the map; if old_key is not None, then the old mapping
140
of old_key is removed.
143
# Check preconditions first.
144
as_st = StaticTuple.from_sequence
145
new_items = {as_st(key) for (old, key, value) in delta
146
if key is not None and old is None}
147
existing_new = list(self.iteritems(key_filter=new_items))
149
raise errors.InconsistentDeltaDelta(delta,
150
"New items are already in the map %r." % existing_new)
152
for old, new, value in delta:
153
if old is not None and old != new:
154
self.unmap(old, check_remap=False)
156
for old, new, value in delta:
163
def _ensure_root(self):
164
"""Ensure that the root node is an object not a key."""
165
if isinstance(self._root_node, StaticTuple):
166
# Demand-load the root
167
self._root_node = self._get_node(self._root_node)
169
def _get_node(self, node):
172
Note that this does not update the _items dict in objects containing a
173
reference to this node. As such it does not prevent subsequent IO being
176
:param node: A tuple key or node object.
177
:return: A node object.
179
if isinstance(node, StaticTuple):
180
bytes = self._read_bytes(node)
181
return _deserialise(bytes, node,
182
search_key_func=self._search_key_func)
186
def _read_bytes(self, key):
188
return _get_cache()[key]
190
stream = self._store.get_record_stream([key], 'unordered', True)
191
bytes = stream.next().get_bytes_as('fulltext')
192
_get_cache()[key] = bytes
195
def _dump_tree(self, include_keys=False):
196
"""Return the tree in a string representation."""
198
res = self._dump_tree_node(self._root_node, prefix='', indent='',
199
include_keys=include_keys)
200
res.append('') # Give a trailing '\n'
201
return '\n'.join(res)
203
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
204
"""For this node and all children, generate a string representation."""
209
node_key = node.key()
210
if node_key is not None:
211
key_str = ' %s' % (node_key[0],)
214
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
216
if isinstance(node, InternalNode):
217
# Trigger all child nodes to get loaded
218
list(node._iter_nodes(self._store))
219
for prefix, sub in sorted(viewitems(node._items)):
220
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
221
include_keys=include_keys))
223
for key, value in sorted(viewitems(node._items)):
224
# Don't use prefix nor indent here to line up when used in
225
# tests in conjunction with assertEqualDiff
226
result.append(' %r %r' % (tuple(key), value))
230
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
231
search_key_func=None):
232
"""Create a CHKMap in store with initial_value as the content.
234
:param store: The store to record initial_value in, a VersionedFiles
235
object with 1-tuple keys supporting CHK key generation.
236
:param initial_value: A dict to store in store. Its keys and values
238
:param maximum_size: The maximum_size rule to apply to nodes. This
239
determines the size at which no new data is added to a single node.
240
:param key_width: The number of elements in each key_tuple being stored
242
:param search_key_func: A function mapping a key => bytes. These bytes
243
are then used by the internal nodes to split up leaf nodes into
245
:return: The root chk of the resulting CHKMap.
247
root_key = klass._create_directly(store, initial_value,
248
maximum_size=maximum_size, key_width=key_width,
249
search_key_func=search_key_func)
250
if not isinstance(root_key, StaticTuple):
251
raise AssertionError('we got a %s instead of a StaticTuple'
256
def _create_via_map(klass, store, initial_value, maximum_size=0,
257
key_width=1, search_key_func=None):
258
result = klass(store, None, search_key_func=search_key_func)
259
result._root_node.set_maximum_size(maximum_size)
260
result._root_node._key_width = key_width
262
for key, value in viewitems(initial_value):
263
delta.append((None, key, value))
264
root_key = result.apply_delta(delta)
268
def _create_directly(klass, store, initial_value, maximum_size=0,
269
key_width=1, search_key_func=None):
270
node = LeafNode(search_key_func=search_key_func)
271
node.set_maximum_size(maximum_size)
272
node._key_width = key_width
273
as_st = StaticTuple.from_sequence
274
node._items = dict((as_st(key), val)
275
for key, val in viewitems(initial_value))
276
node._raw_size = sum(node._key_value_len(key, value)
277
for key, value in viewitems(node._items))
278
node._len = len(node._items)
279
node._compute_search_prefix()
280
node._compute_serialised_prefix()
283
and node._current_size() > maximum_size):
284
prefix, node_details = node._split(store)
285
if len(node_details) == 1:
286
raise AssertionError('Failed to split using node._split')
287
node = InternalNode(prefix, search_key_func=search_key_func)
288
node.set_maximum_size(maximum_size)
289
node._key_width = key_width
290
for split, subnode in node_details:
291
node.add_node(split, subnode)
292
keys = list(node.serialise(store))
295
def iter_changes(self, basis):
296
"""Iterate over the changes between basis and self.
298
:return: An iterator of tuples: (key, old_value, new_value). Old_value
299
is None for keys only in self; new_value is None for keys only in
303
# Read both trees in lexographic, highest-first order.
304
# Any identical nodes we skip
305
# Any unique prefixes we output immediately.
306
# values in a leaf node are treated as single-value nodes in the tree
307
# which allows them to be not-special-cased. We know to output them
308
# because their value is a string, not a key(tuple) or node.
310
# corner cases to beware of when considering this function:
311
# *) common references are at different heights.
312
# consider two trees:
313
# {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
314
# {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
315
# 'ab':LeafNode={'ab':'bar'}}
316
# 'b': LeafNode={'b'}}
317
# the node with aaa/aab will only be encountered in the second tree
318
# after reading the 'a' subtree, but it is encountered in the first
319
# tree immediately. Variations on this may have read internal nodes
320
# like this. we want to cut the entire pending subtree when we
321
# realise we have a common node. For this we use a list of keys -
322
# the path to a node - and check the entire path is clean as we
324
if self._node_key(self._root_node) == self._node_key(basis._root_node):
328
excluded_keys = set()
329
self_node = self._root_node
330
basis_node = basis._root_node
331
# A heap, each element is prefix, node(tuple/NodeObject/string),
332
# key_path (a list of tuples, tail-sharing down the tree.)
335
def process_node(node, path, a_map, pending):
336
# take a node and expand it
337
node = a_map._get_node(node)
338
if isinstance(node, LeafNode):
339
path = (node._key, path)
340
for key, value in viewitems(node._items):
341
# For a LeafNode, the key is a serialized_key, rather than
342
# a search_key, but the heap is using search_keys
343
search_key = node._search_key_func(key)
344
heapq.heappush(pending, (search_key, key, value, path))
346
# type(node) == InternalNode
347
path = (node._key, path)
348
for prefix, child in viewitems(node._items):
349
heapq.heappush(pending, (prefix, None, child, path))
350
def process_common_internal_nodes(self_node, basis_node):
351
self_items = set(viewitems(self_node._items))
352
basis_items = set(viewitems(basis_node._items))
353
path = (self_node._key, None)
354
for prefix, child in self_items - basis_items:
355
heapq.heappush(self_pending, (prefix, None, child, path))
356
path = (basis_node._key, None)
357
for prefix, child in basis_items - self_items:
358
heapq.heappush(basis_pending, (prefix, None, child, path))
359
def process_common_leaf_nodes(self_node, basis_node):
360
self_items = set(viewitems(self_node._items))
361
basis_items = set(viewitems(basis_node._items))
362
path = (self_node._key, None)
363
for key, value in self_items - basis_items:
364
prefix = self._search_key_func(key)
365
heapq.heappush(self_pending, (prefix, key, value, path))
366
path = (basis_node._key, None)
367
for key, value in basis_items - self_items:
368
prefix = basis._search_key_func(key)
369
heapq.heappush(basis_pending, (prefix, key, value, path))
370
def process_common_prefix_nodes(self_node, self_path,
371
basis_node, basis_path):
372
# Would it be more efficient if we could request both at the same
374
self_node = self._get_node(self_node)
375
basis_node = basis._get_node(basis_node)
376
if (isinstance(self_node, InternalNode)
377
and isinstance(basis_node, InternalNode)):
378
# Matching internal nodes
379
process_common_internal_nodes(self_node, basis_node)
380
elif (isinstance(self_node, LeafNode)
381
and isinstance(basis_node, LeafNode)):
382
process_common_leaf_nodes(self_node, basis_node)
384
process_node(self_node, self_path, self, self_pending)
385
process_node(basis_node, basis_path, basis, basis_pending)
386
process_common_prefix_nodes(self_node, None, basis_node, None)
389
excluded_keys = set()
390
def check_excluded(key_path):
391
# Note that this is N^2, it depends on us trimming trees
392
# aggressively to not become slow.
393
# A better implementation would probably have a reverse map
394
# back to the children of a node, and jump straight to it when
395
# a common node is detected, the proceed to remove the already
396
# pending children. breezy.graph has a searcher module with a
398
while key_path is not None:
399
key, key_path = key_path
400
if key in excluded_keys:
405
while self_pending or basis_pending:
408
# self is exhausted: output remainder of basis
409
for prefix, key, node, path in basis_pending:
410
if check_excluded(path):
412
node = basis._get_node(node)
415
yield (key, node, None)
417
# subtree - fastpath the entire thing.
418
for key, value in node.iteritems(basis._store):
419
yield (key, value, None)
421
elif not basis_pending:
422
# basis is exhausted: output remainder of self.
423
for prefix, key, node, path in self_pending:
424
if check_excluded(path):
426
node = self._get_node(node)
429
yield (key, None, node)
431
# subtree - fastpath the entire thing.
432
for key, value in node.iteritems(self._store):
433
yield (key, None, value)
436
# XXX: future optimisation - yield the smaller items
437
# immediately rather than pushing everything on/off the
438
# heaps. Applies to both internal nodes and leafnodes.
439
if self_pending[0][0] < basis_pending[0][0]:
441
prefix, key, node, path = heapq.heappop(self_pending)
442
if check_excluded(path):
446
yield (key, None, node)
448
process_node(node, path, self, self_pending)
450
elif self_pending[0][0] > basis_pending[0][0]:
452
prefix, key, node, path = heapq.heappop(basis_pending)
453
if check_excluded(path):
457
yield (key, node, None)
459
process_node(node, path, basis, basis_pending)
462
# common prefix: possibly expand both
463
if self_pending[0][1] is None:
468
if basis_pending[0][1] is None:
473
if not read_self and not read_basis:
474
# compare a common value
475
self_details = heapq.heappop(self_pending)
476
basis_details = heapq.heappop(basis_pending)
477
if self_details[2] != basis_details[2]:
478
yield (self_details[1],
479
basis_details[2], self_details[2])
481
# At least one side wasn't a simple value
482
if (self._node_key(self_pending[0][2]) ==
483
self._node_key(basis_pending[0][2])):
484
# Identical pointers, skip (and don't bother adding to
485
# excluded, it won't turn up again.
486
heapq.heappop(self_pending)
487
heapq.heappop(basis_pending)
489
# Now we need to expand this node before we can continue
490
if read_self and read_basis:
491
# Both sides start with the same prefix, so process
493
self_prefix, _, self_node, self_path = heapq.heappop(
495
basis_prefix, _, basis_node, basis_path = heapq.heappop(
497
if self_prefix != basis_prefix:
498
raise AssertionError(
499
'%r != %r' % (self_prefix, basis_prefix))
500
process_common_prefix_nodes(
501
self_node, self_path,
502
basis_node, basis_path)
505
prefix, key, node, path = heapq.heappop(self_pending)
506
if check_excluded(path):
508
process_node(node, path, self, self_pending)
510
prefix, key, node, path = heapq.heappop(basis_pending)
511
if check_excluded(path):
513
process_node(node, path, basis, basis_pending)
516
def iteritems(self, key_filter=None):
517
"""Iterate over the entire CHKMap's contents."""
519
if key_filter is not None:
520
as_st = StaticTuple.from_sequence
521
key_filter = [as_st(key) for key in key_filter]
522
return self._root_node.iteritems(self._store, key_filter=key_filter)
525
"""Return the key for this map."""
526
if isinstance(self._root_node, StaticTuple):
527
return self._root_node
529
return self._root_node._key
533
return len(self._root_node)
535
def map(self, key, value):
536
"""Map a key tuple to value.
538
:param key: A key to map.
539
:param value: The value to assign to key.
541
key = StaticTuple.from_sequence(key)
542
# Need a root object.
544
prefix, node_details = self._root_node.map(self._store, key, value)
545
if len(node_details) == 1:
546
self._root_node = node_details[0][1]
548
self._root_node = InternalNode(prefix,
549
search_key_func=self._search_key_func)
550
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
551
self._root_node._key_width = node_details[0][1]._key_width
552
for split, node in node_details:
553
self._root_node.add_node(split, node)
555
def _node_key(self, node):
556
"""Get the key for a node whether it's a tuple or node."""
557
if isinstance(node, tuple):
558
node = StaticTuple.from_sequence(node)
559
if isinstance(node, StaticTuple):
564
def unmap(self, key, check_remap=True):
565
"""remove key from the map."""
566
key = StaticTuple.from_sequence(key)
568
if isinstance(self._root_node, InternalNode):
569
unmapped = self._root_node.unmap(self._store, key,
570
check_remap=check_remap)
572
unmapped = self._root_node.unmap(self._store, key)
573
self._root_node = unmapped
575
def _check_remap(self):
576
"""Check if nodes can be collapsed."""
578
if isinstance(self._root_node, InternalNode):
579
self._root_node = self._root_node._check_remap(self._store)
582
"""Save the map completely.
584
:return: The key of the root node.
586
if isinstance(self._root_node, StaticTuple):
588
return self._root_node
589
keys = list(self._root_node.serialise(self._store))
594
"""Base class defining the protocol for CHK Map nodes.
596
:ivar _raw_size: The total size of the serialized key:value data, before
597
adding the header bytes, and without prefix compression.
600
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
601
'_raw_size', '_items', '_search_prefix', '_search_key_func'
604
def __init__(self, key_width=1):
607
:param key_width: The width of keys for this node.
610
# Current number of elements
612
self._maximum_size = 0
613
self._key_width = key_width
614
# current size in bytes
616
# The pointers/values this node has - meaning defined by child classes.
618
# The common search prefix
619
self._search_prefix = None
622
items_str = str(sorted(self._items))
623
if len(items_str) > 20:
624
items_str = items_str[:16] + '...]'
625
return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
626
self.__class__.__name__, self._key, self._len, self._raw_size,
627
self._maximum_size, self._search_prefix, items_str)
636
def maximum_size(self):
637
"""What is the upper limit for adding references to a node."""
638
return self._maximum_size
640
def set_maximum_size(self, new_size):
641
"""Set the size threshold for nodes.
643
:param new_size: The size at which no data is added to a node. 0 for
646
self._maximum_size = new_size
649
def common_prefix(cls, prefix, key):
650
"""Given 2 strings, return the longest prefix common to both.
652
:param prefix: This has been the common prefix for other keys, so it is
653
more likely to be the common prefix in this case as well.
654
:param key: Another string to compare to
656
if key.startswith(prefix):
659
# Is there a better way to do this?
660
for pos, (left, right) in enumerate(zip(prefix, key)):
664
common = prefix[:pos+1]
668
def common_prefix_for_keys(cls, keys):
669
"""Given a list of keys, find their common prefix.
671
:param keys: An iterable of strings.
672
:return: The longest common prefix of all keys.
676
if common_prefix is None:
679
common_prefix = cls.common_prefix(common_prefix, key)
680
if not common_prefix:
681
# if common_prefix is the empty string, then we know it won't
687
# Singleton indicating we have not computed _search_prefix yet
690
class LeafNode(Node):
691
"""A node containing actual key:value pairs.
693
:ivar _items: A dict of key->value items. The key is in tuple form.
694
:ivar _size: The number of bytes that would be used by serializing all of
698
__slots__ = ('_common_serialised_prefix',)
700
def __init__(self, search_key_func=None):
702
# All of the keys in this leaf node share this common prefix
703
self._common_serialised_prefix = None
704
if search_key_func is None:
705
self._search_key_func = _search_key_plain
707
self._search_key_func = search_key_func
710
items_str = str(sorted(self._items))
711
if len(items_str) > 20:
712
items_str = items_str[:16] + '...]'
714
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
715
% (self.__class__.__name__, self._key, self._len, self._raw_size,
716
self._maximum_size, self._search_prefix, self._key_width, items_str)
718
def _current_size(self):
719
"""Answer the current serialised size of this node.
721
This differs from self._raw_size in that it includes the bytes used for
724
if self._common_serialised_prefix is None:
728
# We will store a single string with the common prefix
729
# And then that common prefix will not be stored in any of the
731
prefix_len = len(self._common_serialised_prefix)
732
bytes_for_items = (self._raw_size - (prefix_len * self._len))
733
return (9 # 'chkleaf:\n'
734
+ len(str(self._maximum_size)) + 1
735
+ len(str(self._key_width)) + 1
736
+ len(str(self._len)) + 1
741
def deserialise(klass, bytes, key, search_key_func=None):
742
"""Deserialise bytes, with key key, into a LeafNode.
744
:param bytes: The bytes of the node.
745
:param key: The key that the serialised node has.
747
key = static_tuple.expect_static_tuple(key)
748
return _deserialise_leaf_node(bytes, key,
749
search_key_func=search_key_func)
751
def iteritems(self, store, key_filter=None):
752
"""Iterate over items in the node.
754
:param key_filter: A filter to apply to the node. It should be a
755
list/set/dict or similar repeatedly iterable container.
757
if key_filter is not None:
758
# Adjust the filter - short elements go to a prefix filter. All
759
# other items are looked up directly.
760
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
762
for key in key_filter:
763
if len(key) == self._key_width:
764
# This filter is meant to match exactly one key, yield it
767
yield key, self._items[key]
769
# This key is not present in this map, continue
772
# Short items, we need to match based on a prefix
773
filters.setdefault(len(key), set()).add(key)
775
filters_itemview = viewitems(filters)
776
for item in viewitems(self._items):
777
for length, length_filter in filters_itemview:
778
if item[0][:length] in length_filter:
782
for item in viewitems(self._items):
785
def _key_value_len(self, key, value):
786
# TODO: Should probably be done without actually joining the key, but
787
# then that can be done via the C extension
788
return (len(self._serialise_key(key)) + 1
789
+ len(str(value.count('\n'))) + 1
792
def _search_key(self, key):
793
return self._search_key_func(key)
795
def _map_no_split(self, key, value):
796
"""Map a key to a value.
798
This assumes either the key does not already exist, or you have already
799
removed its size and length from self.
801
:return: True if adding this node should cause us to split.
803
self._items[key] = value
804
self._raw_size += self._key_value_len(key, value)
806
serialised_key = self._serialise_key(key)
807
if self._common_serialised_prefix is None:
808
self._common_serialised_prefix = serialised_key
810
self._common_serialised_prefix = self.common_prefix(
811
self._common_serialised_prefix, serialised_key)
812
search_key = self._search_key(key)
813
if self._search_prefix is _unknown:
814
self._compute_search_prefix()
815
if self._search_prefix is None:
816
self._search_prefix = search_key
818
self._search_prefix = self.common_prefix(
819
self._search_prefix, search_key)
821
and self._maximum_size
822
and self._current_size() > self._maximum_size):
823
# Check to see if all of the search_keys for this node are
824
# identical. We allow the node to grow under that circumstance
825
# (we could track this as common state, but it is infrequent)
826
if (search_key != self._search_prefix
827
or not self._are_search_keys_identical()):
831
def _split(self, store):
832
"""We have overflowed.
834
Split this node into multiple LeafNodes, return it up the stack so that
835
the next layer creates a new InternalNode and references the new nodes.
837
:return: (common_serialised_prefix, [(node_serialised_prefix, node)])
839
if self._search_prefix is _unknown:
840
raise AssertionError('Search prefix must be known')
841
common_prefix = self._search_prefix
842
split_at = len(common_prefix) + 1
844
for key, value in viewitems(self._items):
845
search_key = self._search_key(key)
846
prefix = search_key[:split_at]
847
# TODO: Generally only 1 key can be exactly the right length,
848
# which means we can only have 1 key in the node pointed
849
# at by the 'prefix\0' key. We might want to consider
850
# folding it into the containing InternalNode rather than
851
# having a fixed length-1 node.
852
# Note this is probably not true for hash keys, as they
853
# may get a '\00' node anywhere, but won't have keys of
855
if len(prefix) < split_at:
856
prefix += '\x00'*(split_at - len(prefix))
857
if prefix not in result:
858
node = LeafNode(search_key_func=self._search_key_func)
859
node.set_maximum_size(self._maximum_size)
860
node._key_width = self._key_width
861
result[prefix] = node
863
node = result[prefix]
864
sub_prefix, node_details = node.map(store, key, value)
865
if len(node_details) > 1:
866
if prefix != sub_prefix:
867
# This node has been split and is now found via a different
870
new_node = InternalNode(sub_prefix,
871
search_key_func=self._search_key_func)
872
new_node.set_maximum_size(self._maximum_size)
873
new_node._key_width = self._key_width
874
for split, node in node_details:
875
new_node.add_node(split, node)
876
result[prefix] = new_node
877
return common_prefix, list(viewitems(result))
879
def map(self, store, key, value):
880
"""Map key to value."""
881
if key in self._items:
882
self._raw_size -= self._key_value_len(key, self._items[key])
885
if self._map_no_split(key, value):
886
return self._split(store)
888
if self._search_prefix is _unknown:
889
raise AssertionError('%r must be known' % self._search_prefix)
890
return self._search_prefix, [("", self)]
892
_serialise_key = '\x00'.join
894
def serialise(self, store):
895
"""Serialise the LeafNode to store.
897
:param store: A VersionedFiles honouring the CHK extensions.
898
:return: An iterable of the keys inserted by this operation.
900
lines = ["chkleaf:\n"]
901
lines.append("%d\n" % self._maximum_size)
902
lines.append("%d\n" % self._key_width)
903
lines.append("%d\n" % self._len)
904
if self._common_serialised_prefix is None:
906
if len(self._items) != 0:
907
raise AssertionError('If _common_serialised_prefix is None'
908
' we should have no items')
910
lines.append('%s\n' % (self._common_serialised_prefix,))
911
prefix_len = len(self._common_serialised_prefix)
912
for key, value in sorted(viewitems(self._items)):
913
# Always add a final newline
914
value_lines = osutils.chunks_to_lines([value + '\n'])
915
serialized = "%s\x00%s\n" % (self._serialise_key(key),
917
if not serialized.startswith(self._common_serialised_prefix):
918
raise AssertionError('We thought the common prefix was %r'
919
' but entry %r does not have it in common'
920
% (self._common_serialised_prefix, serialized))
921
lines.append(serialized[prefix_len:])
922
lines.extend(value_lines)
923
sha1, _, _ = store.add_lines((None,), (), lines)
924
self._key = StaticTuple("sha1:" + sha1,).intern()
925
bytes = ''.join(lines)
926
if len(bytes) != self._current_size():
927
raise AssertionError('Invalid _current_size')
928
_get_cache()[self._key] = bytes
932
"""Return the references to other CHK's held by this node."""
935
def _compute_search_prefix(self):
936
"""Determine the common search prefix for all keys in this node.
938
:return: A bytestring of the longest search key prefix that is
939
unique within this node.
941
search_keys = [self._search_key_func(key) for key in self._items]
942
self._search_prefix = self.common_prefix_for_keys(search_keys)
943
return self._search_prefix
945
def _are_search_keys_identical(self):
946
"""Check to see if the search keys for all entries are the same.
948
When using a hash as the search_key it is possible for non-identical
949
keys to collide. If that happens enough, we may try overflow a
950
LeafNode, but as all are collisions, we must not split.
952
common_search_key = None
953
for key in self._items:
954
search_key = self._search_key(key)
955
if common_search_key is None:
956
common_search_key = search_key
957
elif search_key != common_search_key:
961
def _compute_serialised_prefix(self):
962
"""Determine the common prefix for serialised keys in this node.
964
:return: A bytestring of the longest serialised key prefix that is
965
unique within this node.
967
serialised_keys = [self._serialise_key(key) for key in self._items]
968
self._common_serialised_prefix = self.common_prefix_for_keys(
970
return self._common_serialised_prefix
972
def unmap(self, store, key):
973
"""Unmap key from the node."""
975
self._raw_size -= self._key_value_len(key, self._items[key])
977
trace.mutter("key %s not found in %r", key, self._items)
982
# Recompute from scratch
983
self._compute_search_prefix()
984
self._compute_serialised_prefix()
988
class InternalNode(Node):
989
"""A node that contains references to other nodes.
991
An InternalNode is responsible for mapping search key prefixes to child
994
:ivar _items: serialised_key => node dictionary. node may be a tuple,
995
LeafNode or InternalNode.
998
__slots__ = ('_node_width',)
1000
def __init__(self, prefix='', search_key_func=None):
1002
# The size of an internalnode with default values and no children.
1003
# How many octets key prefixes within this node are.
1004
self._node_width = 0
1005
self._search_prefix = prefix
1006
if search_key_func is None:
1007
self._search_key_func = _search_key_plain
1009
self._search_key_func = search_key_func
1011
def add_node(self, prefix, node):
1012
"""Add a child node with prefix prefix, and node node.
1014
:param prefix: The search key prefix for node.
1015
:param node: The node being added.
1017
if self._search_prefix is None:
1018
raise AssertionError("_search_prefix should not be None")
1019
if not prefix.startswith(self._search_prefix):
1020
raise AssertionError("prefixes mismatch: %s must start with %s"
1021
% (prefix,self._search_prefix))
1022
if len(prefix) != len(self._search_prefix) + 1:
1023
raise AssertionError("prefix wrong length: len(%s) is not %d" %
1024
(prefix, len(self._search_prefix) + 1))
1025
self._len += len(node)
1026
if not len(self._items):
1027
self._node_width = len(prefix)
1028
if self._node_width != len(self._search_prefix) + 1:
1029
raise AssertionError("node width mismatch: %d is not %d" %
1030
(self._node_width, len(self._search_prefix) + 1))
1031
self._items[prefix] = node
1034
def _current_size(self):
1035
"""Answer the current serialised size of this node."""
1036
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
1037
len(str(self._maximum_size)))
1040
def deserialise(klass, bytes, key, search_key_func=None):
1041
"""Deserialise bytes to an InternalNode, with key key.
1043
:param bytes: The bytes of the node.
1044
:param key: The key that the serialised node has.
1045
:return: An InternalNode instance.
1047
key = static_tuple.expect_static_tuple(key)
1048
return _deserialise_internal_node(bytes, key,
1049
search_key_func=search_key_func)
1051
def iteritems(self, store, key_filter=None):
1052
for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1053
for item in node.iteritems(store, key_filter=node_filter):
1056
def _iter_nodes(self, store, key_filter=None, batch_size=None):
1057
"""Iterate over node objects which match key_filter.
1059
:param store: A store to use for accessing content.
1060
:param key_filter: A key filter to filter nodes. Only nodes that might
1061
contain a key in key_filter will be returned.
1062
:param batch_size: If not None, then we will return the nodes that had
1063
to be read using get_record_stream in batches, rather than reading
1065
:return: An iterable of nodes. This function does not have to be fully
1066
consumed. (There will be no pending I/O when items are being returned.)
1068
# Map from chk key ('sha1:...',) to (prefix, key_filter)
1069
# prefix is the key in self._items to use, key_filter is the key_filter
1070
# entries that would match this node
1073
if key_filter is None:
1074
# yielding all nodes, yield whatever we have, and queue up a read
1075
# for whatever we are missing
1077
for prefix, node in viewitems(self._items):
1078
if node.__class__ is StaticTuple:
1079
keys[node] = (prefix, None)
1082
elif len(key_filter) == 1:
1083
# Technically, this path could also be handled by the first check
1084
# in 'self._node_width' in length_filters. However, we can handle
1085
# this case without spending any time building up the
1086
# prefix_to_keys, etc state.
1088
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1089
# 0.626us list(key_filter)[0]
1090
# is a func() for list(), 2 mallocs, and a getitem
1091
# 0.489us [k for k in key_filter][0]
1092
# still has the mallocs, avoids the func() call
1093
# 0.350us iter(key_filter).next()
1094
# has a func() call, and mallocs an iterator
1095
# 0.125us for key in key_filter: pass
1096
# no func() overhead, might malloc an iterator
1097
# 0.105us for key in key_filter: break
1098
# no func() overhead, might malloc an iterator, probably
1099
# avoids checking an 'else' clause as part of the for
1100
for key in key_filter:
1102
search_prefix = self._search_prefix_filter(key)
1103
if len(search_prefix) == self._node_width:
1104
# This item will match exactly, so just do a dict lookup, and
1105
# see what we can return
1108
node = self._items[search_prefix]
1110
# A given key can only match 1 child node, if it isn't
1111
# there, then we can just return nothing
1113
if node.__class__ is StaticTuple:
1114
keys[node] = (search_prefix, [key])
1116
# This is loaded, and the only thing that can match,
1121
# First, convert all keys into a list of search prefixes
1122
# Aggregate common prefixes, and track the keys they come from
1125
for key in key_filter:
1126
search_prefix = self._search_prefix_filter(key)
1127
length_filter = length_filters.setdefault(
1128
len(search_prefix), set())
1129
length_filter.add(search_prefix)
1130
prefix_to_keys.setdefault(search_prefix, []).append(key)
1132
if (self._node_width in length_filters
1133
and len(length_filters) == 1):
1134
# all of the search prefixes match exactly _node_width. This
1135
# means that everything is an exact match, and we can do a
1136
# lookup into self._items, rather than iterating over the items
1138
search_prefixes = length_filters[self._node_width]
1139
for search_prefix in search_prefixes:
1141
node = self._items[search_prefix]
1143
# We can ignore this one
1145
node_key_filter = prefix_to_keys[search_prefix]
1146
if node.__class__ is StaticTuple:
1147
keys[node] = (search_prefix, node_key_filter)
1149
yield node, node_key_filter
1151
# The slow way. We walk every item in self._items, and check to
1152
# see if there are any matches
1153
length_filters_itemview = viewitems(length_filters)
1154
for prefix, node in viewitems(self._items):
1155
node_key_filter = []
1156
for length, length_filter in length_filters_itemview:
1157
sub_prefix = prefix[:length]
1158
if sub_prefix in length_filter:
1159
node_key_filter.extend(prefix_to_keys[sub_prefix])
1160
if node_key_filter: # this key matched something, yield it
1161
if node.__class__ is StaticTuple:
1162
keys[node] = (prefix, node_key_filter)
1164
yield node, node_key_filter
1166
# Look in the page cache for some more bytes
1170
bytes = _get_cache()[key]
1174
node = _deserialise(bytes, key,
1175
search_key_func=self._search_key_func)
1176
prefix, node_key_filter = keys[key]
1177
self._items[prefix] = node
1179
yield node, node_key_filter
1180
for key in found_keys:
1183
# demand load some pages.
1184
if batch_size is None:
1185
# Read all the keys in
1186
batch_size = len(keys)
1187
key_order = list(keys)
1188
for batch_start in range(0, len(key_order), batch_size):
1189
batch = key_order[batch_start:batch_start + batch_size]
1190
# We have to fully consume the stream so there is no pending
1191
# I/O, so we buffer the nodes for now.
1192
stream = store.get_record_stream(batch, 'unordered', True)
1193
node_and_filters = []
1194
for record in stream:
1195
bytes = record.get_bytes_as('fulltext')
1196
node = _deserialise(bytes, record.key,
1197
search_key_func=self._search_key_func)
1198
prefix, node_key_filter = keys[record.key]
1199
node_and_filters.append((node, node_key_filter))
1200
self._items[prefix] = node
1201
_get_cache()[record.key] = bytes
1202
for info in node_and_filters:
1205
def map(self, store, key, value):
1206
"""Map key to value."""
1207
if not len(self._items):
1208
raise AssertionError("can't map in an empty InternalNode.")
1209
search_key = self._search_key(key)
1210
if self._node_width != len(self._search_prefix) + 1:
1211
raise AssertionError("node width mismatch: %d is not %d" %
1212
(self._node_width, len(self._search_prefix) + 1))
1213
if not search_key.startswith(self._search_prefix):
1214
# This key doesn't fit in this index, so we need to split at the
1215
# point where it would fit, insert self into that internal node,
1216
# and then map this key into that node.
1217
new_prefix = self.common_prefix(self._search_prefix,
1219
new_parent = InternalNode(new_prefix,
1220
search_key_func=self._search_key_func)
1221
new_parent.set_maximum_size(self._maximum_size)
1222
new_parent._key_width = self._key_width
1223
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1225
return new_parent.map(store, key, value)
1226
children = [node for node, _
1227
in self._iter_nodes(store, key_filter=[key])]
1232
child = self._new_child(search_key, LeafNode)
1233
old_len = len(child)
1234
if isinstance(child, LeafNode):
1235
old_size = child._current_size()
1238
prefix, node_details = child.map(store, key, value)
1239
if len(node_details) == 1:
1240
# child may have shrunk, or might be a new node
1241
child = node_details[0][1]
1242
self._len = self._len - old_len + len(child)
1243
self._items[search_key] = child
1246
if isinstance(child, LeafNode):
1247
if old_size is None:
1248
# The old node was an InternalNode which means it has now
1249
# collapsed, so we need to check if it will chain to a
1250
# collapse at this level.
1251
trace.mutter("checking remap as InternalNode -> LeafNode")
1252
new_node = self._check_remap(store)
1254
# If the LeafNode has shrunk in size, we may want to run
1255
# a remap check. Checking for a remap is expensive though
1256
# and the frequency of a successful remap is very low.
1257
# Shrinkage by small amounts is common, so we only do the
1258
# remap check if the new_size is low or the shrinkage
1259
# amount is over a configurable limit.
1260
new_size = child._current_size()
1261
shrinkage = old_size - new_size
1262
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1263
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1265
"checking remap as size shrunk by %d to be %d",
1266
shrinkage, new_size)
1267
new_node = self._check_remap(store)
1268
if new_node._search_prefix is None:
1269
raise AssertionError("_search_prefix should not be None")
1270
return new_node._search_prefix, [('', new_node)]
1271
# child has overflown - create a new intermediate node.
1272
# XXX: This is where we might want to try and expand our depth
1273
# to refer to more bytes of every child (which would give us
1274
# multiple pointers to child nodes, but less intermediate nodes)
1275
child = self._new_child(search_key, InternalNode)
1276
child._search_prefix = prefix
1277
for split, node in node_details:
1278
child.add_node(split, node)
1279
self._len = self._len - old_len + len(child)
1281
return self._search_prefix, [("", self)]
1283
def _new_child(self, search_key, klass):
1284
"""Create a new child node of type klass."""
1286
child.set_maximum_size(self._maximum_size)
1287
child._key_width = self._key_width
1288
child._search_key_func = self._search_key_func
1289
self._items[search_key] = child
1292
def serialise(self, store):
1293
"""Serialise the node to store.
1295
:param store: A VersionedFiles honouring the CHK extensions.
1296
:return: An iterable of the keys inserted by this operation.
1298
for node in viewvalues(self._items):
1299
if isinstance(node, StaticTuple):
1300
# Never deserialised.
1302
if node._key is not None:
1305
for key in node.serialise(store):
1307
lines = ["chknode:\n"]
1308
lines.append("%d\n" % self._maximum_size)
1309
lines.append("%d\n" % self._key_width)
1310
lines.append("%d\n" % self._len)
1311
if self._search_prefix is None:
1312
raise AssertionError("_search_prefix should not be None")
1313
lines.append('%s\n' % (self._search_prefix,))
1314
prefix_len = len(self._search_prefix)
1315
for prefix, node in sorted(viewitems(self._items)):
1316
if isinstance(node, StaticTuple):
1320
serialised = "%s\x00%s\n" % (prefix, key)
1321
if not serialised.startswith(self._search_prefix):
1322
raise AssertionError("prefixes mismatch: %s must start with %s"
1323
% (serialised, self._search_prefix))
1324
lines.append(serialised[prefix_len:])
1325
sha1, _, _ = store.add_lines((None,), (), lines)
1326
self._key = StaticTuple("sha1:" + sha1,).intern()
1327
_get_cache()[self._key] = ''.join(lines)
1330
def _search_key(self, key):
1331
"""Return the serialised key for key in this node."""
1332
# search keys are fixed width. All will be self._node_width wide, so we
1334
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1336
def _search_prefix_filter(self, key):
1337
"""Serialise key for use as a prefix filter in iteritems."""
1338
return self._search_key_func(key)[:self._node_width]
1340
def _split(self, offset):
1341
"""Split this node into smaller nodes starting at offset.
1343
:param offset: The offset to start the new child nodes at.
1344
:return: An iterable of (prefix, node) tuples. prefix is a byte
1345
prefix for reaching node.
1347
if offset >= self._node_width:
1348
for node in valueview(self._items):
1349
for result in node._split(offset):
1353
"""Return the references to other CHK's held by this node."""
1354
if self._key is None:
1355
raise AssertionError("unserialised nodes have no refs.")
1357
for value in viewvalues(self._items):
1358
if isinstance(value, StaticTuple):
1361
refs.append(value.key())
1364
def _compute_search_prefix(self, extra_key=None):
1365
"""Return the unique key prefix for this node.
1367
:return: A bytestring of the longest search key prefix that is
1368
unique within this node.
1370
self._search_prefix = self.common_prefix_for_keys(self._items)
1371
return self._search_prefix
1373
def unmap(self, store, key, check_remap=True):
1374
"""Remove key from this node and its children."""
1375
if not len(self._items):
1376
raise AssertionError("can't unmap in an empty InternalNode.")
1377
children = [node for node, _
1378
in self._iter_nodes(store, key_filter=[key])]
1384
unmapped = child.unmap(store, key)
1386
search_key = self._search_key(key)
1387
if len(unmapped) == 0:
1388
# All child nodes are gone, remove the child:
1389
del self._items[search_key]
1392
# Stash the returned node
1393
self._items[search_key] = unmapped
1394
if len(self._items) == 1:
1395
# this node is no longer needed:
1396
return list(viewvalues(self._items))[0]
1397
if isinstance(unmapped, InternalNode):
1400
return self._check_remap(store)
1404
def _check_remap(self, store):
1405
"""Check if all keys contained by children fit in a single LeafNode.
1407
:param store: A store to use for reading more nodes
1408
:return: Either self, or a new LeafNode which should replace self.
1410
# Logic for how we determine when we need to rebuild
1411
# 1) Implicitly unmap() is removing a key which means that the child
1412
# nodes are going to be shrinking by some extent.
1413
# 2) If all children are LeafNodes, it is possible that they could be
1414
# combined into a single LeafNode, which can then completely replace
1415
# this internal node with a single LeafNode
1416
# 3) If *one* child is an InternalNode, we assume it has already done
1417
# all the work to determine that its children cannot collapse, and
1418
# we can then assume that those nodes *plus* the current nodes don't
1419
# have a chance of collapsing either.
1420
# So a very cheap check is to just say if 'unmapped' is an
1421
# InternalNode, we don't have to check further.
1423
# TODO: Another alternative is to check the total size of all known
1424
# LeafNodes. If there is some formula we can use to determine the
1425
# final size without actually having to read in any more
1426
# children, it would be nice to have. However, we have to be
1427
# careful with stuff like nodes that pull out the common prefix
1428
# of each key, as adding a new key can change the common prefix
1429
# and cause size changes greater than the length of one key.
1430
# So for now, we just add everything to a new Leaf until it
1431
# splits, as we know that will give the right answer
1432
new_leaf = LeafNode(search_key_func=self._search_key_func)
1433
new_leaf.set_maximum_size(self._maximum_size)
1434
new_leaf._key_width = self._key_width
1435
# A batch_size of 16 was chosen because:
1436
# a) In testing, a 4k page held 14 times. So if we have more than 16
1437
# leaf nodes we are unlikely to hold them in a single new leaf
1438
# node. This still allows for 1 round trip
1439
# b) With 16-way fan out, we can still do a single round trip
1440
# c) With 255-way fan out, we don't want to read all 255 and destroy
1441
# the page cache, just to determine that we really don't need it.
1442
for node, _ in self._iter_nodes(store, batch_size=16):
1443
if isinstance(node, InternalNode):
1444
# Without looking at any leaf nodes, we are sure
1446
for key, value in viewitems(node._items):
1447
if new_leaf._map_no_split(key, value):
1449
trace.mutter("remap generated a new LeafNode")
1453
def _deserialise(bytes, key, search_key_func):
1454
"""Helper for repositorydetails - convert bytes to a node."""
1455
if bytes.startswith("chkleaf:\n"):
1456
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1457
elif bytes.startswith("chknode:\n"):
1458
node = InternalNode.deserialise(bytes, key,
1459
search_key_func=search_key_func)
1461
raise AssertionError("Unknown node type.")
1465
class CHKMapDifference(object):
1466
"""Iterate the stored pages and key,value pairs for (new - old).
1468
This class provides a generator over the stored CHK pages and the
1469
(key, value) pairs that are in any of the new maps and not in any of the
1472
Note that it may yield chk pages that are common (especially root nodes),
1473
but it won't yield (key,value) pairs that are common.
1476
def __init__(self, store, new_root_keys, old_root_keys,
1477
search_key_func, pb=None):
1478
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1479
# force callers to use StaticTuple, because there will often be
1480
# lots of keys passed in here. And even if we cast it locally,
1481
# that just meanst that we will have *both* a StaticTuple and a
1482
# tuple() in memory, referring to the same object. (so a net
1483
# increase in memory, not a decrease.)
1485
self._new_root_keys = new_root_keys
1486
self._old_root_keys = old_root_keys
1488
# All uninteresting chks that we have seen. By the time they are added
1489
# here, they should be either fully ignored, or queued up for
1491
# TODO: This might grow to a large size if there are lots of merge
1492
# parents, etc. However, it probably doesn't scale to O(history)
1493
# like _processed_new_refs does.
1494
self._all_old_chks = set(self._old_root_keys)
1495
# All items that we have seen from the old_root_keys
1496
self._all_old_items = set()
1497
# These are interesting items which were either read, or already in the
1498
# interesting queue (so we don't need to walk them again)
1499
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1501
self._processed_new_refs = set()
1502
self._search_key_func = search_key_func
1504
# The uninteresting and interesting nodes to be searched
1505
self._old_queue = []
1506
self._new_queue = []
1507
# Holds the (key, value) items found when processing the root nodes,
1508
# waiting for the uninteresting nodes to be walked
1509
self._new_item_queue = []
1512
def _read_nodes_from_store(self, keys):
1513
# We chose not to use _get_cache(), because we think in
1514
# terms of records to be yielded. Also, we expect to touch each page
1515
# only 1 time during this code. (We may want to evaluate saving the
1516
# raw bytes into the page cache, which would allow a working tree
1517
# update after the fetch to not have to read the bytes again.)
1518
as_st = StaticTuple.from_sequence
1519
stream = self._store.get_record_stream(keys, 'unordered', True)
1520
for record in stream:
1521
if self._pb is not None:
1523
if record.storage_kind == 'absent':
1524
raise errors.NoSuchRevision(self._store, record.key)
1525
bytes = record.get_bytes_as('fulltext')
1526
node = _deserialise(bytes, record.key,
1527
search_key_func=self._search_key_func)
1528
if isinstance(node, InternalNode):
1529
# Note we don't have to do node.refs() because we know that
1530
# there are no children that have been pushed into this node
1531
# Note: Using as_st() here seemed to save 1.2MB, which would
1532
# indicate that we keep 100k prefix_refs around while
1533
# processing. They *should* be shorter lived than that...
1534
# It does cost us ~10s of processing time
1535
prefix_refs = list(viewitems(node._items))
1539
# Note: We don't use a StaticTuple here. Profiling showed a
1540
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1541
# But a significant slowdown (15s / 145s, or 10%)
1542
items = list(viewitems(node._items))
1543
yield record, node, prefix_refs, items
1545
def _read_old_roots(self):
1546
old_chks_to_enqueue = []
1547
all_old_chks = self._all_old_chks
1548
for record, node, prefix_refs, items in \
1549
self._read_nodes_from_store(self._old_root_keys):
1550
# Uninteresting node
1551
prefix_refs = [p_r for p_r in prefix_refs
1552
if p_r[1] not in all_old_chks]
1553
new_refs = [p_r[1] for p_r in prefix_refs]
1554
all_old_chks.update(new_refs)
1555
# TODO: This might be a good time to turn items into StaticTuple
1556
# instances and possibly intern them. However, this does not
1557
# impact 'initial branch' performance, so I'm not worrying
1559
self._all_old_items.update(items)
1560
# Queue up the uninteresting references
1561
# Don't actually put them in the 'to-read' queue until we have
1562
# finished checking the interesting references
1563
old_chks_to_enqueue.extend(prefix_refs)
1564
return old_chks_to_enqueue
1566
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1567
# At this point, we have read all the uninteresting and interesting
1568
# items, so we can queue up the uninteresting stuff, knowing that we've
1569
# handled the interesting ones
1570
for prefix, ref in old_chks_to_enqueue:
1571
not_interesting = True
1572
for i in range(len(prefix), 0, -1):
1573
if prefix[:i] in new_prefixes:
1574
not_interesting = False
1577
# This prefix is not part of the remaining 'interesting set'
1579
self._old_queue.append(ref)
1581
def _read_all_roots(self):
1582
"""Read the root pages.
1584
This is structured as a generator, so that the root records can be
1585
yielded up to whoever needs them without any buffering.
1587
# This is the bootstrap phase
1588
if not self._old_root_keys:
1589
# With no old_root_keys we can just shortcut and be ready
1590
# for _flush_new_queue
1591
self._new_queue = list(self._new_root_keys)
1593
old_chks_to_enqueue = self._read_old_roots()
1594
# filter out any root keys that are already known to be uninteresting
1595
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1596
# These are prefixes that are present in new_keys that we are
1598
new_prefixes = set()
1599
# We are about to yield all of these, so we don't want them getting
1600
# added a second time
1601
processed_new_refs = self._processed_new_refs
1602
processed_new_refs.update(new_keys)
1603
for record, node, prefix_refs, items in \
1604
self._read_nodes_from_store(new_keys):
1605
# At this level, we now know all the uninteresting references
1606
# So we filter and queue up whatever is remaining
1607
prefix_refs = [p_r for p_r in prefix_refs
1608
if p_r[1] not in self._all_old_chks
1609
and p_r[1] not in processed_new_refs]
1610
refs = [p_r[1] for p_r in prefix_refs]
1611
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1612
self._new_queue.extend(refs)
1613
# TODO: We can potentially get multiple items here, however the
1614
# current design allows for this, as callers will do the work
1615
# to make the results unique. We might profile whether we
1616
# gain anything by ensuring unique return values for items
1617
# TODO: This might be a good time to cast to StaticTuple, as
1618
# self._new_item_queue will hold the contents of multiple
1619
# records for an extended lifetime
1620
new_items = [item for item in items
1621
if item not in self._all_old_items]
1622
self._new_item_queue.extend(new_items)
1623
new_prefixes.update([self._search_key_func(item[0])
1624
for item in new_items])
1625
processed_new_refs.update(refs)
1627
# For new_prefixes we have the full length prefixes queued up.
1628
# However, we also need possible prefixes. (If we have a known ref to
1629
# 'ab', then we also need to include 'a'.) So expand the
1630
# new_prefixes to include all shorter prefixes
1631
for prefix in list(new_prefixes):
1632
new_prefixes.update([prefix[:i] for i in range(1, len(prefix))])
1633
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1635
def _flush_new_queue(self):
1636
# No need to maintain the heap invariant anymore, just pull things out
1638
refs = set(self._new_queue)
1639
self._new_queue = []
1640
# First pass, flush all interesting items and convert to using direct refs
1641
all_old_chks = self._all_old_chks
1642
processed_new_refs = self._processed_new_refs
1643
all_old_items = self._all_old_items
1644
new_items = [item for item in self._new_item_queue
1645
if item not in all_old_items]
1646
self._new_item_queue = []
1648
yield None, new_items
1649
refs = refs.difference(all_old_chks)
1650
processed_new_refs.update(refs)
1652
# TODO: Using a SimpleSet for self._processed_new_refs and
1653
# saved as much as 10MB of peak memory. However, it requires
1654
# implementing a non-pyrex version.
1656
next_refs_update = next_refs.update
1657
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1658
# from 1m54s to 1m51s. Consider it.
1659
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1661
# using the 'if' check saves about 145s => 141s, when
1662
# streaming initial branch of Launchpad data.
1663
items = [item for item in items
1664
if item not in all_old_items]
1666
next_refs_update([p_r[1] for p_r in p_refs])
1668
# set1.difference(set/dict) walks all of set1, and checks if it
1669
# exists in 'other'.
1670
# set1.difference(iterable) walks all of iterable, and does a
1671
# 'difference_update' on a clone of set1. Pick wisely based on the
1672
# expected sizes of objects.
1673
# in our case it is expected that 'new_refs' will always be quite
1675
next_refs = next_refs.difference(all_old_chks)
1676
next_refs = next_refs.difference(processed_new_refs)
1677
processed_new_refs.update(next_refs)
1680
def _process_next_old(self):
1681
# Since we don't filter uninteresting any further than during
1682
# _read_all_roots, process the whole queue in a single pass.
1683
refs = self._old_queue
1684
self._old_queue = []
1685
all_old_chks = self._all_old_chks
1686
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1687
# TODO: Use StaticTuple here?
1688
self._all_old_items.update(items)
1689
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1690
self._old_queue.extend(refs)
1691
all_old_chks.update(refs)
1693
def _process_queues(self):
1694
while self._old_queue:
1695
self._process_next_old()
1696
return self._flush_new_queue()
1699
for record in self._read_all_roots():
1701
for record, items in self._process_queues():
1705
def iter_interesting_nodes(store, interesting_root_keys,
1706
uninteresting_root_keys, pb=None):
1707
"""Given root keys, find interesting nodes.
1709
Evaluate nodes referenced by interesting_root_keys. Ones that are also
1710
referenced from uninteresting_root_keys are not considered interesting.
1712
:param interesting_root_keys: keys which should be part of the
1713
"interesting" nodes (which will be yielded)
1714
:param uninteresting_root_keys: keys which should be filtered out of the
1717
(interesting record, {interesting key:values})
1719
iterator = CHKMapDifference(store, interesting_root_keys,
1720
uninteresting_root_keys,
1721
search_key_func=store._search_key_func,
1723
return iterator.process()
1727
from breezy._chk_map_pyx import (
1731
_deserialise_leaf_node,
1732
_deserialise_internal_node,
1734
except ImportError as e:
1735
osutils.failed_to_load_extension(e)
1736
from breezy._chk_map_py import (
1740
_deserialise_leaf_node,
1741
_deserialise_internal_node,
1743
search_key_registry.register('hash-16-way', _search_key_16)
1744
search_key_registry.register('hash-255-way', _search_key_255)
1747
def _check_key(key):
1748
"""Helper function to assert that a key is properly formatted.
1750
This generally shouldn't be used in production code, but it can be helpful
1753
if not isinstance(key, StaticTuple):
1754
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1756
raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
1757
if not isinstance(key[0], str):
1758
raise TypeError('key %r should hold a str, not %r'
1759
% (key, type(key[0])))
1760
if not key[0].startswith('sha1:'):
1761
raise ValueError('key %r should point to a sha1:' % (key,))