42
from bzrlib import lazy_import
43
lazy_import.lazy_import(globals(), """
51
from ..static_tuple import StaticTuple
54
57
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
55
58
# out, it takes 3.1MB to cache the layer.
56
_PAGE_CACHE_SIZE = 4 * 1024 * 1024
57
# Per thread caches for 2 reasons:
58
# - in the server we may be serving very different content, so we get less
60
# - we avoid locking on every cache lookup.
61
_thread_caches = threading.local()
63
_thread_caches.page_cache = None
67
"""Get the per-thread page cache.
69
We need a function to do this because in a new thread the _thread_caches
70
threading.local object does not have the cache initialized yet.
72
page_cache = getattr(_thread_caches, 'page_cache', None)
73
if page_cache is None:
74
# We are caching bytes so len(value) is perfectly accurate
75
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
76
_thread_caches.page_cache = page_cache
59
_PAGE_CACHE_SIZE = 4*1024*1024
60
# We are caching bytes so len(value) is perfectly accurate
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
84
66
# If a ChildNode falls below this many bytes, we check for a remap
85
67
_INTERESTING_NEW_SIZE = 50
86
68
# If a ChildNode shrinks by more than this amount, we check for a remap
87
69
_INTERESTING_SHRINKAGE_LIMIT = 20
70
# If we delete more than this many nodes applying a delta, we check for a remap
71
_INTERESTING_DELETES_LIMIT = 5
90
74
def _search_key_plain(key):
91
75
"""Map the key tuple into a search string that just uses the key bytes."""
92
return b'\x00'.join(key)
76
return '\x00'.join(key)
95
79
search_key_registry = registry.Registry()
96
search_key_registry.register(b'plain', _search_key_plain)
80
search_key_registry.register('plain', _search_key_plain)
99
83
class CHKMap(object):
100
84
"""A persistent map from string to string backed by a CHK store."""
102
__slots__ = ('_store', '_root_node', '_search_key_func')
104
86
def __init__(self, store, root_key, search_key_func=None):
105
87
"""Create a CHKMap object.
128
110
into the map; if old_key is not None, then the old mapping
129
111
of old_key is removed.
132
114
# Check preconditions first.
133
as_st = StaticTuple.from_sequence
134
new_items = {as_st(key) for (old, key, value) in delta
135
if key is not None and old is None}
115
new_items = set([key for (old, key, value) in delta if key is not None
136
117
existing_new = list(self.iteritems(key_filter=new_items))
138
119
raise errors.InconsistentDeltaDelta(delta,
139
"New items are already in the map %r." % existing_new)
120
"New items are already in the map %r." % existing_new)
140
121
# Now apply changes.
141
122
for old, new, value in delta:
142
123
if old is not None and old != new:
143
124
self.unmap(old, check_remap=False)
145
126
for old, new, value in delta:
146
127
if new is not None:
147
128
self.map(new, value)
129
if delete_count > _INTERESTING_DELETES_LIMIT:
130
trace.mutter("checking remap as %d deletions", delete_count)
149
131
self._check_remap()
150
132
return self._save()
152
134
def _ensure_root(self):
153
135
"""Ensure that the root node is an object not a key."""
154
if isinstance(self._root_node, StaticTuple):
136
if type(self._root_node) is tuple:
155
137
# Demand-load the root
156
138
self._root_node = self._get_node(self._root_node)
165
147
:param node: A tuple key or node object.
166
148
:return: A node object.
168
if isinstance(node, StaticTuple):
150
if type(node) is tuple:
169
151
bytes = self._read_bytes(node)
170
152
return _deserialise(bytes, node,
171
search_key_func=self._search_key_func)
153
search_key_func=self._search_key_func)
175
157
def _read_bytes(self, key):
177
return _get_cache()[key]
159
return _page_cache[key]
179
161
stream = self._store.get_record_stream([key], 'unordered', True)
180
bytes = next(stream).get_bytes_as('fulltext')
181
_get_cache()[key] = bytes
162
bytes = stream.next().get_bytes_as('fulltext')
163
_page_cache[key] = bytes
184
def _dump_tree(self, include_keys=False, encoding='utf-8'):
166
def _dump_tree(self, include_keys=False):
185
167
"""Return the tree in a string representation."""
186
168
self._ensure_root()
187
def decode(x): return x.decode(encoding)
188
res = self._dump_tree_node(self._root_node, prefix=b'', indent='',
189
decode=decode, include_keys=include_keys)
190
res.append('') # Give a trailing '\n'
169
res = self._dump_tree_node(self._root_node, prefix='', indent='',
170
include_keys=include_keys)
171
res.append('') # Give a trailing '\n'
191
172
return '\n'.join(res)
193
def _dump_tree_node(self, node, prefix, indent, decode, include_keys=True):
174
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
194
175
"""For this node and all children, generate a string representation."""
196
177
if not include_keys:
199
180
node_key = node.key()
200
181
if node_key is not None:
201
key_str = ' %s' % (decode(node_key[0]),)
182
key_str = ' %s' % (node_key[0],)
203
184
key_str = ' None'
204
result.append('%s%r %s%s' % (indent, decode(prefix), node.__class__.__name__,
185
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
206
if isinstance(node, InternalNode):
187
if type(node) is InternalNode:
207
188
# Trigger all child nodes to get loaded
208
189
list(node._iter_nodes(self._store))
209
for prefix, sub in sorted(node._items.items()):
190
for prefix, sub in sorted(node._items.iteritems()):
210
191
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
211
decode=decode, include_keys=include_keys))
192
include_keys=include_keys))
213
for key, value in sorted(node._items.items()):
194
for key, value in sorted(node._items.iteritems()):
214
195
# Don't use prefix nor indent here to line up when used in
215
196
# tests in conjunction with assertEqualDiff
216
result.append(' %r %r' % (
217
tuple([decode(ke) for ke in key]), decode(value)))
197
result.append(' %r %r' % (key, value))
221
201
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
222
search_key_func=None):
202
search_key_func=None):
223
203
"""Create a CHKMap in store with initial_value as the content.
225
205
:param store: The store to record initial_value in, a VersionedFiles
236
216
:return: The root chk of the resulting CHKMap.
238
218
root_key = klass._create_directly(store, initial_value,
239
maximum_size=maximum_size, key_width=key_width,
240
search_key_func=search_key_func)
241
if not isinstance(root_key, StaticTuple):
242
raise AssertionError('we got a %s instead of a StaticTuple'
219
maximum_size=maximum_size, key_width=key_width,
220
search_key_func=search_key_func)
261
238
node = LeafNode(search_key_func=search_key_func)
262
239
node.set_maximum_size(maximum_size)
263
240
node._key_width = key_width
264
as_st = StaticTuple.from_sequence
265
node._items = dict((as_st(key), val)
266
for key, val in initial_value.items())
267
node._raw_size = sum(node._key_value_len(key, value)
268
for key, value in node._items.items())
241
node._items = dict(initial_value)
242
node._raw_size = sum([node._key_value_len(key, value)
243
for key,value in initial_value.iteritems()])
269
244
node._len = len(node._items)
270
245
node._compute_search_prefix()
271
246
node._compute_serialised_prefix()
272
if (node._len > 1 and
274
node._current_size() > maximum_size):
249
and node._current_size() > maximum_size):
275
250
prefix, node_details = node._split(store)
276
251
if len(node_details) == 1:
277
252
raise AssertionError('Failed to split using node._split')
323
298
# key_path (a list of tuples, tail-sharing down the tree.)
324
299
self_pending = []
325
300
basis_pending = []
327
301
def process_node(node, path, a_map, pending):
328
302
# take a node and expand it
329
303
node = a_map._get_node(node)
330
if isinstance(node, LeafNode):
304
if type(node) == LeafNode:
331
305
path = (node._key, path)
332
306
for key, value in node._items.items():
333
307
# For a LeafNode, the key is a serialized_key, rather than
339
313
path = (node._key, path)
340
314
for prefix, child in node._items.items():
341
315
heapq.heappush(pending, (prefix, None, child, path))
343
316
def process_common_internal_nodes(self_node, basis_node):
344
317
self_items = set(self_node._items.items())
345
318
basis_items = set(basis_node._items.items())
349
322
path = (basis_node._key, None)
350
323
for prefix, child in basis_items - self_items:
351
324
heapq.heappush(basis_pending, (prefix, None, child, path))
353
325
def process_common_leaf_nodes(self_node, basis_node):
354
326
self_items = set(self_node._items.items())
355
327
basis_items = set(basis_node._items.items())
361
333
for key, value in basis_items - self_items:
362
334
prefix = basis._search_key_func(key)
363
335
heapq.heappush(basis_pending, (prefix, key, value, path))
365
336
def process_common_prefix_nodes(self_node, self_path,
366
337
basis_node, basis_path):
367
338
# Would it be more efficient if we could request both at the same
369
340
self_node = self._get_node(self_node)
370
341
basis_node = basis._get_node(basis_node)
371
if (isinstance(self_node, InternalNode) and
372
isinstance(basis_node, InternalNode)):
342
if (type(self_node) == InternalNode
343
and type(basis_node) == InternalNode):
373
344
# Matching internal nodes
374
345
process_common_internal_nodes(self_node, basis_node)
375
elif (isinstance(self_node, LeafNode) and
376
isinstance(basis_node, LeafNode)):
346
elif (type(self_node) == LeafNode
347
and type(basis_node) == LeafNode):
377
348
process_common_leaf_nodes(self_node, basis_node)
379
350
process_node(self_node, self_path, self, self_pending)
382
353
self_seen = set()
383
354
basis_seen = set()
384
355
excluded_keys = set()
386
356
def check_excluded(key_path):
387
357
# Note that this is N^2, it depends on us trimming trees
388
358
# aggressively to not become slow.
389
359
# A better implementation would probably have a reverse map
390
360
# back to the children of a node, and jump straight to it when
391
361
# a common node is detected, the proceed to remove the already
392
# pending children. breezy.graph has a searcher module with a
362
# pending children. bzrlib.graph has a searcher module with a
393
363
# similar problem.
394
364
while key_path is not None:
395
365
key, key_path = key_path
472
442
basis_details = heapq.heappop(basis_pending)
473
443
if self_details[2] != basis_details[2]:
474
444
yield (self_details[1],
475
basis_details[2], self_details[2])
445
basis_details[2], self_details[2])
477
447
# At least one side wasn't a simple value
478
if (self._node_key(self_pending[0][2])
479
== self._node_key(basis_pending[0][2])):
448
if (self._node_key(self_pending[0][2]) ==
449
self._node_key(basis_pending[0][2])):
480
450
# Identical pointers, skip (and don't bother adding to
481
451
# excluded, it won't turn up again.
482
452
heapq.heappop(self_pending)
512
482
def iteritems(self, key_filter=None):
513
483
"""Iterate over the entire CHKMap's contents."""
514
484
self._ensure_root()
515
if key_filter is not None:
516
as_st = StaticTuple.from_sequence
517
key_filter = [as_st(key) for key in key_filter]
518
485
return self._root_node.iteritems(self._store, key_filter=key_filter)
521
488
"""Return the key for this map."""
522
if isinstance(self._root_node, StaticTuple):
489
if type(self._root_node) is tuple:
523
490
return self._root_node
525
492
return self._root_node._key
531
498
def map(self, key, value):
532
499
"""Map a key tuple to value.
534
501
:param key: A key to map.
535
502
:param value: The value to assign to key.
537
key = StaticTuple.from_sequence(key)
538
504
# Need a root object.
539
505
self._ensure_root()
540
506
prefix, node_details = self._root_node.map(self._store, key, value)
542
508
self._root_node = node_details[0][1]
544
510
self._root_node = InternalNode(prefix,
545
search_key_func=self._search_key_func)
511
search_key_func=self._search_key_func)
546
512
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
547
513
self._root_node._key_width = node_details[0][1]._key_width
548
514
for split, node in node_details:
551
517
def _node_key(self, node):
552
518
"""Get the key for a node whether it's a tuple or node."""
553
if isinstance(node, tuple):
554
node = StaticTuple.from_sequence(node)
555
if isinstance(node, StaticTuple):
519
if type(node) is tuple:
560
524
def unmap(self, key, check_remap=True):
561
525
"""remove key from the map."""
562
key = StaticTuple.from_sequence(key)
563
526
self._ensure_root()
564
if isinstance(self._root_node, InternalNode):
527
if type(self._root_node) is InternalNode:
565
528
unmapped = self._root_node.unmap(self._store, key,
566
check_remap=check_remap)
529
check_remap=check_remap)
568
531
unmapped = self._root_node.unmap(self._store, key)
569
532
self._root_node = unmapped
571
534
def _check_remap(self):
572
535
"""Check if nodes can be collapsed."""
573
536
self._ensure_root()
574
if isinstance(self._root_node, InternalNode):
575
self._root_node = self._root_node._check_remap(self._store)
537
if type(self._root_node) is InternalNode:
538
self._root_node._check_remap(self._store)
578
541
"""Save the map completely.
580
543
:return: The key of the root node.
582
if isinstance(self._root_node, StaticTuple):
545
if type(self._root_node) is tuple:
584
547
return self._root_node
585
548
keys = list(self._root_node.serialise(self._store))
593
556
adding the header bytes, and without prefix compression.
596
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
597
'_raw_size', '_items', '_search_prefix', '_search_key_func'
600
559
def __init__(self, key_width=1):
601
560
"""Create a node.
692
650
the key/value pairs.
695
__slots__ = ('_common_serialised_prefix',)
697
653
def __init__(self, search_key_func=None):
698
654
Node.__init__(self)
699
655
# All of the keys in this leaf node share this common prefix
700
656
self._common_serialised_prefix = None
657
self._serialise_key = '\x00'.join
701
658
if search_key_func is None:
702
659
self._search_key_func = _search_key_plain
711
668
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
712
669
% (self.__class__.__name__, self._key, self._len, self._raw_size,
713
self._maximum_size, self._search_prefix, self._key_width, items_str)
670
self._maximum_size, self._search_prefix, self._key_width, items_str)
715
672
def _current_size(self):
716
673
"""Answer the current serialised size of this node.
728
685
prefix_len = len(self._common_serialised_prefix)
729
686
bytes_for_items = (self._raw_size - (prefix_len * self._len))
730
return (9 + # 'chkleaf:\n' +
731
len(str(self._maximum_size)) + 1 +
732
len(str(self._key_width)) + 1 +
733
len(str(self._len)) + 1 +
687
return (9 # 'chkleaf:\n'
688
+ len(str(self._maximum_size)) + 1
689
+ len(str(self._key_width)) + 1
690
+ len(str(self._len)) + 1
738
695
def deserialise(klass, bytes, key, search_key_func=None):
741
698
:param bytes: The bytes of the node.
742
699
:param key: The key that the serialised node has.
744
key = static_tuple.expect_static_tuple(key)
745
701
return _deserialise_leaf_node(bytes, key,
746
702
search_key_func=search_key_func)
769
725
# Short items, we need to match based on a prefix
770
filters.setdefault(len(key), set()).add(key)
726
length_filter = filters.setdefault(len(key), set())
727
length_filter.add(key)
772
filters_itemview = filters.items()
773
for item in self._items.items():
774
for length, length_filter in filters_itemview:
729
filters = filters.items()
730
for item in self._items.iteritems():
731
for length, length_filter in filters:
775
732
if item[0][:length] in length_filter:
779
yield from self._items.items()
736
for item in self._items.iteritems():
781
739
def _key_value_len(self, key, value):
782
740
# TODO: Should probably be done without actually joining the key, but
783
741
# then that can be done via the C extension
784
return (len(self._serialise_key(key)) + 1 +
785
len(b'%d' % value.count(b'\n')) + 1 +
742
return (len(self._serialise_key(key)) + 1
743
+ len(str(value.count('\n'))) + 1
788
746
def _search_key(self, key):
789
747
return self._search_key_func(key)
814
772
self._search_prefix = self.common_prefix(
815
773
self._search_prefix, search_key)
816
if (self._len > 1 and
817
self._maximum_size and
818
self._current_size() > self._maximum_size):
775
and self._maximum_size
776
and self._current_size() > self._maximum_size):
819
777
# Check to see if all of the search_keys for this node are
820
778
# identical. We allow the node to grow under that circumstance
821
779
# (we could track this as common state, but it is infrequent)
822
if (search_key != self._search_prefix or
823
not self._are_search_keys_identical()):
780
if (search_key != self._search_prefix
781
or not self._are_search_keys_identical()):
837
795
common_prefix = self._search_prefix
838
796
split_at = len(common_prefix) + 1
840
for key, value in self._items.items():
798
for key, value in self._items.iteritems():
841
799
search_key = self._search_key(key)
842
800
prefix = search_key[:split_at]
843
801
# TODO: Generally only 1 key can be exactly the right length,
849
807
# may get a '\00' node anywhere, but won't have keys of
850
808
# different lengths.
851
809
if len(prefix) < split_at:
852
prefix += b'\x00' * (split_at - len(prefix))
810
prefix += '\x00'*(split_at - len(prefix))
853
811
if prefix not in result:
854
812
node = LeafNode(search_key_func=self._search_key_func)
855
813
node.set_maximum_size(self._maximum_size)
865
823
result.pop(prefix)
866
824
new_node = InternalNode(sub_prefix,
867
search_key_func=self._search_key_func)
825
search_key_func=self._search_key_func)
868
826
new_node.set_maximum_size(self._maximum_size)
869
827
new_node._key_width = self._key_width
870
828
for split, node in node_details:
871
829
new_node.add_node(split, node)
872
830
result[prefix] = new_node
873
return common_prefix, list(result.items())
831
return common_prefix, result.items()
875
833
def map(self, store, key, value):
876
834
"""Map key to value."""
884
842
if self._search_prefix is _unknown:
885
843
raise AssertionError('%r must be known' % self._search_prefix)
886
return self._search_prefix, [(b"", self)]
888
_serialise_key = b'\x00'.join
844
return self._search_prefix, [("", self)]
890
846
def serialise(self, store):
891
847
"""Serialise the LeafNode to store.
893
849
:param store: A VersionedFiles honouring the CHK extensions.
894
850
:return: An iterable of the keys inserted by this operation.
896
lines = [b"chkleaf:\n"]
897
lines.append(b"%d\n" % self._maximum_size)
898
lines.append(b"%d\n" % self._key_width)
899
lines.append(b"%d\n" % self._len)
852
lines = ["chkleaf:\n"]
853
lines.append("%d\n" % self._maximum_size)
854
lines.append("%d\n" % self._key_width)
855
lines.append("%d\n" % self._len)
900
856
if self._common_serialised_prefix is None:
902
858
if len(self._items) != 0:
903
859
raise AssertionError('If _common_serialised_prefix is None'
904
' we should have no items')
860
' we should have no items')
906
lines.append(b'%s\n' % (self._common_serialised_prefix,))
862
lines.append('%s\n' % (self._common_serialised_prefix,))
907
863
prefix_len = len(self._common_serialised_prefix)
908
864
for key, value in sorted(self._items.items()):
909
865
# Always add a final newline
910
value_lines = osutils.chunks_to_lines([value + b'\n'])
911
serialized = b"%s\x00%d\n" % (self._serialise_key(key),
866
value_lines = osutils.chunks_to_lines([value + '\n'])
867
serialized = "%s\x00%s\n" % (self._serialise_key(key),
913
869
if not serialized.startswith(self._common_serialised_prefix):
914
870
raise AssertionError('We thought the common prefix was %r'
915
' but entry %r does not have it in common'
916
% (self._common_serialised_prefix, serialized))
871
' but entry %r does not have it in common'
872
% (self._common_serialised_prefix, serialized))
917
873
lines.append(serialized[prefix_len:])
918
874
lines.extend(value_lines)
919
875
sha1, _, _ = store.add_lines((None,), (), lines)
920
self._key = StaticTuple(b"sha1:" + sha1,).intern()
921
data = b''.join(lines)
922
if len(data) != self._current_size():
876
self._key = ("sha1:" + sha1,)
877
bytes = ''.join(lines)
878
if len(bytes) != self._current_size():
923
879
raise AssertionError('Invalid _current_size')
924
_get_cache()[self._key] = data
880
_page_cache.add(self._key, bytes)
925
881
return [self._key]
991
947
LeafNode or InternalNode.
994
__slots__ = ('_node_width',)
996
def __init__(self, prefix=b'', search_key_func=None):
950
def __init__(self, prefix='', search_key_func=None):
997
951
Node.__init__(self)
998
952
# The size of an internalnode with default values and no children.
999
953
# How many octets key prefixes within this node are.
1014
968
raise AssertionError("_search_prefix should not be None")
1015
969
if not prefix.startswith(self._search_prefix):
1016
970
raise AssertionError("prefixes mismatch: %s must start with %s"
1017
% (prefix, self._search_prefix))
971
% (prefix,self._search_prefix))
1018
972
if len(prefix) != len(self._search_prefix) + 1:
1019
973
raise AssertionError("prefix wrong length: len(%s) is not %d" %
1020
(prefix, len(self._search_prefix) + 1))
974
(prefix, len(self._search_prefix) + 1))
1021
975
self._len += len(node)
1022
976
if not len(self._items):
1023
977
self._node_width = len(prefix)
1024
978
if self._node_width != len(self._search_prefix) + 1:
1025
979
raise AssertionError("node width mismatch: %d is not %d" %
1026
(self._node_width, len(self._search_prefix) + 1))
980
(self._node_width, len(self._search_prefix) + 1))
1027
981
self._items[prefix] = node
1028
982
self._key = None
1030
984
def _current_size(self):
1031
985
"""Answer the current serialised size of this node."""
1032
return (self._raw_size + len(str(self._len)) + len(str(self._key_width))
1033
+ len(str(self._maximum_size)))
986
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
987
len(str(self._maximum_size)))
1036
990
def deserialise(klass, bytes, key, search_key_func=None):
1040
994
:param key: The key that the serialised node has.
1041
995
:return: An InternalNode instance.
1043
key = static_tuple.expect_static_tuple(key)
1044
997
return _deserialise_internal_node(bytes, key,
1045
998
search_key_func=search_key_func)
1070
1023
# yielding all nodes, yield whatever we have, and queue up a read
1071
1024
# for whatever we are missing
1072
1025
shortcut = True
1073
for prefix, node in self._items.items():
1074
if node.__class__ is StaticTuple:
1026
for prefix, node in self._items.iteritems():
1027
if node.__class__ is tuple:
1075
1028
keys[node] = (prefix, None)
1077
1030
yield node, None
1106
1059
# A given key can only match 1 child node, if it isn't
1107
1060
# there, then we can just return nothing
1109
if node.__class__ is StaticTuple:
1062
if node.__class__ is tuple:
1110
1063
keys[node] = (search_prefix, [key])
1112
1065
# This is loaded, and the only thing that can match,
1121
1074
for key in key_filter:
1122
1075
search_prefix = self._search_prefix_filter(key)
1123
1076
length_filter = length_filters.setdefault(
1124
len(search_prefix), set())
1077
len(search_prefix), set())
1125
1078
length_filter.add(search_prefix)
1126
1079
prefix_to_keys.setdefault(search_prefix, []).append(key)
1128
if (self._node_width in length_filters and
1129
len(length_filters) == 1):
1081
if (self._node_width in length_filters
1082
and len(length_filters) == 1):
1130
1083
# all of the search prefixes match exactly _node_width. This
1131
1084
# means that everything is an exact match, and we can do a
1132
1085
# lookup into self._items, rather than iterating over the items
1139
1092
# We can ignore this one
1141
1094
node_key_filter = prefix_to_keys[search_prefix]
1142
if node.__class__ is StaticTuple:
1095
if node.__class__ is tuple:
1143
1096
keys[node] = (search_prefix, node_key_filter)
1145
1098
yield node, node_key_filter
1147
1100
# The slow way. We walk every item in self._items, and check to
1148
1101
# see if there are any matches
1149
length_filters_itemview = length_filters.items()
1150
for prefix, node in self._items.items():
1102
length_filters = length_filters.items()
1103
for prefix, node in self._items.iteritems():
1151
1104
node_key_filter = []
1152
for length, length_filter in length_filters_itemview:
1105
for length, length_filter in length_filters:
1153
1106
sub_prefix = prefix[:length]
1154
1107
if sub_prefix in length_filter:
1155
1108
node_key_filter.extend(prefix_to_keys[sub_prefix])
1156
if node_key_filter: # this key matched something, yield it
1157
if node.__class__ is StaticTuple:
1109
if node_key_filter: # this key matched something, yield it
1110
if node.__class__ is tuple:
1158
1111
keys[node] = (prefix, node_key_filter)
1160
1113
yield node, node_key_filter
1163
1116
found_keys = set()
1164
1117
for key in keys:
1166
bytes = _get_cache()[key]
1119
bytes = _page_cache[key]
1167
1120
except KeyError:
1170
1123
node = _deserialise(bytes, key,
1171
search_key_func=self._search_key_func)
1124
search_key_func=self._search_key_func)
1172
1125
prefix, node_key_filter = keys[key]
1173
1126
self._items[prefix] = node
1174
1127
found_keys.add(key)
1190
1143
for record in stream:
1191
1144
bytes = record.get_bytes_as('fulltext')
1192
1145
node = _deserialise(bytes, record.key,
1193
search_key_func=self._search_key_func)
1146
search_key_func=self._search_key_func)
1194
1147
prefix, node_key_filter = keys[record.key]
1195
1148
node_and_filters.append((node, node_key_filter))
1196
1149
self._items[prefix] = node
1197
_get_cache()[record.key] = bytes
1150
_page_cache.add(record.key, bytes)
1198
1151
for info in node_and_filters:
1205
1158
search_key = self._search_key(key)
1206
1159
if self._node_width != len(self._search_prefix) + 1:
1207
1160
raise AssertionError("node width mismatch: %d is not %d" %
1208
(self._node_width, len(self._search_prefix) + 1))
1161
(self._node_width, len(self._search_prefix) + 1))
1209
1162
if not search_key.startswith(self._search_prefix):
1210
1163
# This key doesn't fit in this index, so we need to split at the
1211
1164
# point where it would fit, insert self into that internal node,
1213
1166
new_prefix = self.common_prefix(self._search_prefix,
1215
1168
new_parent = InternalNode(new_prefix,
1216
search_key_func=self._search_key_func)
1169
search_key_func=self._search_key_func)
1217
1170
new_parent.set_maximum_size(self._maximum_size)
1218
1171
new_parent._key_width = self._key_width
1219
new_parent.add_node(self._search_prefix[:len(new_prefix) + 1],
1172
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1221
1174
return new_parent.map(store, key, value)
1222
children = [node for node, _ in self._iter_nodes(
1223
store, key_filter=[key])]
1175
children = [node for node, _
1176
in self._iter_nodes(store, key_filter=[key])]
1225
1178
child = children[0]
1227
1180
# new child needed:
1228
1181
child = self._new_child(search_key, LeafNode)
1229
1182
old_len = len(child)
1230
if isinstance(child, LeafNode):
1183
if type(child) is LeafNode:
1231
1184
old_size = child._current_size()
1233
1186
old_size = None
1239
1192
self._items[search_key] = child
1240
1193
self._key = None
1241
1194
new_node = self
1242
if isinstance(child, LeafNode):
1195
if type(child) is LeafNode:
1243
1196
if old_size is None:
1244
1197
# The old node was an InternalNode which means it has now
1245
1198
# collapsed, so we need to check if it will chain to a
1255
1208
# amount is over a configurable limit.
1256
1209
new_size = child._current_size()
1257
1210
shrinkage = old_size - new_size
1258
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE or
1259
shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1211
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1212
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1261
1214
"checking remap as size shrunk by %d to be %d",
1262
1215
shrinkage, new_size)
1263
1216
new_node = self._check_remap(store)
1264
1217
if new_node._search_prefix is None:
1265
1218
raise AssertionError("_search_prefix should not be None")
1266
return new_node._search_prefix, [(b'', new_node)]
1219
return new_node._search_prefix, [('', new_node)]
1267
1220
# child has overflown - create a new intermediate node.
1268
1221
# XXX: This is where we might want to try and expand our depth
1269
1222
# to refer to more bytes of every child (which would give us
1274
1227
child.add_node(split, node)
1275
1228
self._len = self._len - old_len + len(child)
1276
1229
self._key = None
1277
return self._search_prefix, [(b"", self)]
1230
return self._search_prefix, [("", self)]
1279
1232
def _new_child(self, search_key, klass):
1280
1233
"""Create a new child node of type klass."""
1291
1244
:param store: A VersionedFiles honouring the CHK extensions.
1292
1245
:return: An iterable of the keys inserted by this operation.
1294
for node in self._items.values():
1295
if isinstance(node, StaticTuple):
1247
for node in self._items.itervalues():
1248
if type(node) is tuple:
1296
1249
# Never deserialised.
1298
1251
if node._key is not None:
1301
1254
for key in node.serialise(store):
1303
lines = [b"chknode:\n"]
1304
lines.append(b"%d\n" % self._maximum_size)
1305
lines.append(b"%d\n" % self._key_width)
1306
lines.append(b"%d\n" % self._len)
1256
lines = ["chknode:\n"]
1257
lines.append("%d\n" % self._maximum_size)
1258
lines.append("%d\n" % self._key_width)
1259
lines.append("%d\n" % self._len)
1307
1260
if self._search_prefix is None:
1308
1261
raise AssertionError("_search_prefix should not be None")
1309
lines.append(b'%s\n' % (self._search_prefix,))
1262
lines.append('%s\n' % (self._search_prefix,))
1310
1263
prefix_len = len(self._search_prefix)
1311
1264
for prefix, node in sorted(self._items.items()):
1312
if isinstance(node, StaticTuple):
1265
if type(node) is tuple:
1315
1268
key = node._key[0]
1316
serialised = b"%s\x00%s\n" % (prefix, key)
1269
serialised = "%s\x00%s\n" % (prefix, key)
1317
1270
if not serialised.startswith(self._search_prefix):
1318
1271
raise AssertionError("prefixes mismatch: %s must start with %s"
1319
% (serialised, self._search_prefix))
1272
% (serialised, self._search_prefix))
1320
1273
lines.append(serialised[prefix_len:])
1321
1274
sha1, _, _ = store.add_lines((None,), (), lines)
1322
self._key = StaticTuple(b"sha1:" + sha1,).intern()
1323
_get_cache()[self._key] = b''.join(lines)
1275
self._key = ("sha1:" + sha1,)
1276
_page_cache.add(self._key, ''.join(lines))
1324
1277
yield self._key
1326
1279
def _search_key(self, key):
1327
1280
"""Return the serialised key for key in this node."""
1328
1281
# search keys are fixed width. All will be self._node_width wide, so we
1329
1282
# pad as necessary.
1330
return (self._search_key_func(key) + b'\x00' * self._node_width)[:self._node_width]
1283
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1332
1285
def _search_prefix_filter(self, key):
1333
1286
"""Serialise key for use as a prefix filter in iteritems."""
1341
1294
prefix for reaching node.
1343
1296
if offset >= self._node_width:
1344
for node in valueview(self._items):
1297
for node in self._items.values():
1345
1298
for result in node._split(offset):
1301
for key, node in self._items.items():
1348
1304
def refs(self):
1349
1305
"""Return the references to other CHK's held by this node."""
1350
1306
if self._key is None:
1351
1307
raise AssertionError("unserialised nodes have no refs.")
1353
for value in self._items.values():
1354
if isinstance(value, StaticTuple):
1309
for value in self._items.itervalues():
1310
if type(value) is tuple:
1355
1311
refs.append(value)
1357
1313
refs.append(value.key())
1367
1323
return self._search_prefix
1369
1325
def unmap(self, store, key, check_remap=True):
1370
"""Remove key from this node and its children."""
1326
"""Remove key from this node and it's children."""
1371
1327
if not len(self._items):
1372
1328
raise AssertionError("can't unmap in an empty InternalNode.")
1373
1329
children = [node for node, _
1374
in self._iter_nodes(store, key_filter=[key])]
1330
in self._iter_nodes(store, key_filter=[key])]
1376
1332
child = children[0]
1389
1345
self._items[search_key] = unmapped
1390
1346
if len(self._items) == 1:
1391
1347
# this node is no longer needed:
1392
return list(self._items.values())[0]
1393
if isinstance(unmapped, InternalNode):
1348
return self._items.values()[0]
1349
if type(unmapped) is InternalNode:
1395
1351
if check_remap:
1396
1352
return self._check_remap(store)
1436
1392
# c) With 255-way fan out, we don't want to read all 255 and destroy
1437
1393
# the page cache, just to determine that we really don't need it.
1438
1394
for node, _ in self._iter_nodes(store, batch_size=16):
1439
if isinstance(node, InternalNode):
1395
if type(node) is InternalNode:
1440
1396
# Without looking at any leaf nodes, we are sure
1442
for key, value in node._items.items():
1398
for key, value in node._items.iteritems():
1443
1399
if new_leaf._map_no_split(key, value):
1445
1401
trace.mutter("remap generated a new LeafNode")
1446
1402
return new_leaf
1449
def _deserialise(data, key, search_key_func):
1405
def _deserialise(bytes, key, search_key_func):
1450
1406
"""Helper for repositorydetails - convert bytes to a node."""
1451
if data.startswith(b"chkleaf:\n"):
1452
node = LeafNode.deserialise(data, key, search_key_func=search_key_func)
1453
elif data.startswith(b"chknode:\n"):
1454
node = InternalNode.deserialise(data, key,
1455
search_key_func=search_key_func)
1407
if bytes.startswith("chkleaf:\n"):
1408
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1409
elif bytes.startswith("chknode:\n"):
1410
node = InternalNode.deserialise(bytes, key,
1411
search_key_func=search_key_func)
1457
1413
raise AssertionError("Unknown node type.")
1472
1428
def __init__(self, store, new_root_keys, old_root_keys,
1473
1429
search_key_func, pb=None):
1474
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1475
# force callers to use StaticTuple, because there will often be
1476
# lots of keys passed in here. And even if we cast it locally,
1477
# that just meanst that we will have *both* a StaticTuple and a
1478
# tuple() in memory, referring to the same object. (so a net
1479
# increase in memory, not a decrease.)
1480
1430
self._store = store
1481
1431
self._new_root_keys = new_root_keys
1482
1432
self._old_root_keys = old_root_keys
1484
1434
# All uninteresting chks that we have seen. By the time they are added
1485
1435
# here, they should be either fully ignored, or queued up for
1487
# TODO: This might grow to a large size if there are lots of merge
1488
# parents, etc. However, it probably doesn't scale to O(history)
1489
# like _processed_new_refs does.
1490
1437
self._all_old_chks = set(self._old_root_keys)
1491
1438
# All items that we have seen from the old_root_keys
1492
1439
self._all_old_items = set()
1493
1440
# These are interesting items which were either read, or already in the
1494
1441
# interesting queue (so we don't need to walk them again)
1495
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1497
1442
self._processed_new_refs = set()
1498
1443
self._search_key_func = search_key_func
1506
1451
self._state = None
1508
1453
def _read_nodes_from_store(self, keys):
1509
# We chose not to use _get_cache(), because we think in
1510
# terms of records to be yielded. Also, we expect to touch each page
1511
# only 1 time during this code. (We may want to evaluate saving the
1512
# raw bytes into the page cache, which would allow a working tree
1513
# update after the fetch to not have to read the bytes again.)
1514
as_st = StaticTuple.from_sequence
1454
# We chose not to use _page_cache, because we think in terms of records
1455
# to be yielded. Also, we expect to touch each page only 1 time during
1456
# this code. (We may want to evaluate saving the raw bytes into the
1457
# page cache, which would allow a working tree update after the fetch
1458
# to not have to read the bytes again.)
1515
1459
stream = self._store.get_record_stream(keys, 'unordered', True)
1516
1460
for record in stream:
1517
1461
if self._pb is not None:
1521
1465
bytes = record.get_bytes_as('fulltext')
1522
1466
node = _deserialise(bytes, record.key,
1523
1467
search_key_func=self._search_key_func)
1524
if isinstance(node, InternalNode):
1468
if type(node) is InternalNode:
1525
1469
# Note we don't have to do node.refs() because we know that
1526
1470
# there are no children that have been pushed into this node
1527
# Note: Using as_st() here seemed to save 1.2MB, which would
1528
# indicate that we keep 100k prefix_refs around while
1529
# processing. They *should* be shorter lived than that...
1530
# It does cost us ~10s of processing time
1531
prefix_refs = list(node._items.items())
1471
prefix_refs = node._items.items()
1534
1474
prefix_refs = []
1535
# Note: We don't use a StaticTuple here. Profiling showed a
1536
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1537
# But a significant slowdown (15s / 145s, or 10%)
1538
items = list(node._items.items())
1475
items = node._items.items()
1539
1476
yield record, node, prefix_refs, items
1541
1478
def _read_old_roots(self):
1545
1482
self._read_nodes_from_store(self._old_root_keys):
1546
1483
# Uninteresting node
1547
1484
prefix_refs = [p_r for p_r in prefix_refs
1548
if p_r[1] not in all_old_chks]
1485
if p_r[1] not in all_old_chks]
1549
1486
new_refs = [p_r[1] for p_r in prefix_refs]
1550
1487
all_old_chks.update(new_refs)
1551
# TODO: This might be a good time to turn items into StaticTuple
1552
# instances and possibly intern them. However, this does not
1553
# impact 'initial branch' performance, so I'm not worrying
1555
1488
self._all_old_items.update(items)
1556
1489
# Queue up the uninteresting references
1557
1490
# Don't actually put them in the 'to-read' queue until we have
1565
1498
# handled the interesting ones
1566
1499
for prefix, ref in old_chks_to_enqueue:
1567
1500
not_interesting = True
1568
for i in range(len(prefix), 0, -1):
1501
for i in xrange(len(prefix), 0, -1):
1569
1502
if prefix[:i] in new_prefixes:
1570
1503
not_interesting = False
1601
1534
# At this level, we now know all the uninteresting references
1602
1535
# So we filter and queue up whatever is remaining
1603
1536
prefix_refs = [p_r for p_r in prefix_refs
1604
if p_r[1] not in self._all_old_chks and
1605
p_r[1] not in processed_new_refs]
1537
if p_r[1] not in self._all_old_chks
1538
and p_r[1] not in processed_new_refs]
1606
1539
refs = [p_r[1] for p_r in prefix_refs]
1607
1540
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1608
1541
self._new_queue.extend(refs)
1610
1543
# current design allows for this, as callers will do the work
1611
1544
# to make the results unique. We might profile whether we
1612
1545
# gain anything by ensuring unique return values for items
1613
# TODO: This might be a good time to cast to StaticTuple, as
1614
# self._new_item_queue will hold the contents of multiple
1615
# records for an extended lifetime
1616
1546
new_items = [item for item in items
1617
if item not in self._all_old_items]
1547
if item not in self._all_old_items]
1618
1548
self._new_item_queue.extend(new_items)
1619
1549
new_prefixes.update([self._search_key_func(item[0])
1620
1550
for item in new_items])
1625
1555
# 'ab', then we also need to include 'a'.) So expand the
1626
1556
# new_prefixes to include all shorter prefixes
1627
1557
for prefix in list(new_prefixes):
1628
new_prefixes.update([prefix[:i] for i in range(1, len(prefix))])
1558
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1629
1559
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1631
1561
def _flush_new_queue(self):
1638
1568
processed_new_refs = self._processed_new_refs
1639
1569
all_old_items = self._all_old_items
1640
1570
new_items = [item for item in self._new_item_queue
1641
if item not in all_old_items]
1571
if item not in all_old_items]
1642
1572
self._new_item_queue = []
1644
1574
yield None, new_items
1645
1575
refs = refs.difference(all_old_chks)
1646
processed_new_refs.update(refs)
1648
# TODO: Using a SimpleSet for self._processed_new_refs and
1649
# saved as much as 10MB of peak memory. However, it requires
1650
# implementing a non-pyrex version.
1651
1577
next_refs = set()
1652
1578
next_refs_update = next_refs.update
1653
1579
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1654
1580
# from 1m54s to 1m51s. Consider it.
1655
1581
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1657
# using the 'if' check saves about 145s => 141s, when
1658
# streaming initial branch of Launchpad data.
1659
items = [item for item in items
1660
if item not in all_old_items]
1582
items = [item for item in items
1583
if item not in all_old_items]
1661
1584
yield record, items
1662
1585
next_refs_update([p_r[1] for p_r in p_refs])
1664
# set1.difference(set/dict) walks all of set1, and checks if it
1665
# exists in 'other'.
1666
# set1.difference(iterable) walks all of iterable, and does a
1667
# 'difference_update' on a clone of set1. Pick wisely based on the
1668
# expected sizes of objects.
1669
# in our case it is expected that 'new_refs' will always be quite
1671
1586
next_refs = next_refs.difference(all_old_chks)
1672
1587
next_refs = next_refs.difference(processed_new_refs)
1673
1588
processed_new_refs.update(next_refs)
1680
1595
self._old_queue = []
1681
1596
all_old_chks = self._all_old_chks
1682
1597
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1683
# TODO: Use StaticTuple here?
1684
1598
self._all_old_items.update(items)
1685
refs = [r for _, r in prefix_refs if r not in all_old_chks]
1599
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1686
1600
self._old_queue.extend(refs)
1687
1601
all_old_chks.update(refs)
1723
from ._chk_map_pyx import (
1727
_deserialise_leaf_node,
1728
_deserialise_internal_node,
1730
except ImportError as e:
1731
osutils.failed_to_load_extension(e)
1732
from ._chk_map_py import (
1736
_deserialise_leaf_node,
1737
_deserialise_internal_node,
1739
search_key_registry.register(b'hash-16-way', _search_key_16)
1740
search_key_registry.register(b'hash-255-way', _search_key_255)
1743
def _check_key(key):
1744
"""Helper function to assert that a key is properly formatted.
1746
This generally shouldn't be used in production code, but it can be helpful
1749
if not isinstance(key, StaticTuple):
1750
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1752
raise ValueError('key %r should have length 1, not %d' %
1754
if not isinstance(key[0], str):
1755
raise TypeError('key %r should hold a str, not %r'
1756
% (key, type(key[0])))
1757
if not key[0].startswith('sha1:'):
1758
raise ValueError('key %r should point to a sha1:' % (key,))
1637
from bzrlib._chk_map_pyx import (
1640
_deserialise_leaf_node,
1641
_deserialise_internal_node,
1644
from bzrlib._chk_map_py import (
1647
_deserialise_leaf_node,
1648
_deserialise_internal_node,
1650
search_key_registry.register('hash-16-way', _search_key_16)
1651
search_key_registry.register('hash-255-way', _search_key_255)