1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
from bisect import bisect_right
23
from copy import deepcopy
39
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
40
from bzrlib.osutils import basename, dirname
41
from bzrlib.transport import get_transport
44
_BTSIGNATURE = "B+Tree Graph Index 2\n"
45
_OPTION_ROW_LENGTHS = "row_lengths="
46
_LEAF_FLAG = "type=leaf\n"
47
_INTERNAL_FLAG = "type=internal\n"
48
_INTERNAL_OFFSET = "offset="
50
_RESERVED_HEADER_BYTES = 120
53
# 4K per page: 4MB - 1000 entries
54
_NODE_CACHE_SIZE = 1000
57
class _BuilderRow(object):
58
"""The stored state accumulated while writing out a row in the index.
60
:ivar spool: A temporary file used to accumulate nodes for this row
62
:ivar nodes: The count of nodes emitted so far.
66
"""Create a _BuilderRow."""
68
self.spool = tempfile.TemporaryFile()
71
def finish_node(self, pad=True):
72
byte_lines, _, padding = self.writer.finish()
75
self.spool.write("\x00" * _RESERVED_HEADER_BYTES)
77
if not pad and padding:
79
skipped_bytes = padding
80
self.spool.writelines(byte_lines)
81
if (self.spool.tell() + skipped_bytes) % _PAGE_SIZE != 0:
82
raise AssertionError("incorrect node length")
87
class _InternalBuilderRow(_BuilderRow):
88
"""The stored state accumulated while writing out internal rows."""
90
def finish_node(self, pad=True):
92
raise AssertionError("Must pad internal nodes only.")
93
_BuilderRow.finish_node(self)
96
class _LeafBuilderRow(_BuilderRow):
97
"""The stored state accumulated while writing out a leaf rows."""
100
class BTreeBuilder(index.GraphIndexBuilder):
101
"""A Builder for B+Tree based Graph indices.
103
The resulting graph has the structure:
105
_SIGNATURE OPTIONS NODES
106
_SIGNATURE := 'B+Tree Graph Index 1' NEWLINE
107
OPTIONS := REF_LISTS KEY_ELEMENTS LENGTH
108
REF_LISTS := 'node_ref_lists=' DIGITS NEWLINE
109
KEY_ELEMENTS := 'key_elements=' DIGITS NEWLINE
110
LENGTH := 'len=' DIGITS NEWLINE
111
ROW_LENGTHS := 'row_lengths' DIGITS (COMMA DIGITS)*
112
NODES := NODE_COMPRESSED*
113
NODE_COMPRESSED:= COMPRESSED_BYTES{4096}
114
NODE_RAW := INTERNAL | LEAF
115
INTERNAL := INTERNAL_FLAG POINTERS
116
LEAF := LEAF_FLAG ROWS
117
KEY_ELEMENT := Not-whitespace-utf8
118
KEY := KEY_ELEMENT (NULL KEY_ELEMENT)*
120
ROW := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE
122
REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1}
123
REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)?
125
VALUE := no-newline-no-null-bytes
128
def __init__(self, reference_lists=0, key_elements=1, spill_at=100000):
129
"""See GraphIndexBuilder.__init__.
131
:param spill_at: Optional parameter controlling the maximum number
132
of nodes that BTreeBuilder will hold in memory.
134
index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists,
135
key_elements=key_elements)
136
self._spill_at = spill_at
137
self._backing_indices = []
139
def add_node(self, key, value, references=()):
140
"""Add a node to the index.
142
If adding the node causes the builder to reach its spill_at threshold,
143
disk spilling will be triggered.
145
:param key: The key. keys are non-empty tuples containing
146
as many whitespace-free utf8 bytestrings as the key length
147
defined for this index.
148
:param references: An iterable of iterables of keys. Each is a
149
reference to another key.
150
:param value: The value to associate with the key. It may be any
151
bytes as long as it does not contain \0 or \n.
153
index.GraphIndexBuilder.add_node(self, key, value, references=references)
154
if len(self._keys) < self._spill_at:
156
iterators_to_combine = [iter(sorted(self._iter_mem_nodes()))]
158
for pos, backing in enumerate(self._backing_indices):
162
iterators_to_combine.append(backing.iter_all_entries())
163
backing_pos = pos + 1
164
new_backing_file, size = \
165
self._write_nodes(self._iter_smallest(iterators_to_combine))
166
new_backing = BTreeGraphIndex(
167
get_transport(dirname(new_backing_file.name)),
168
basename(new_backing_file.name), size)
169
# GC will clean up the file
170
new_backing._file = new_backing_file
171
if len(self._backing_indices) == backing_pos:
172
self._backing_indices.append(None)
173
self._backing_indices[backing_pos] = new_backing
174
for pos in range(backing_pos):
175
self._backing_indices[pos] = None
178
self._nodes_by_key = {}
180
def add_nodes(self, nodes):
181
"""Add nodes to the index.
183
:param nodes: An iterable of (key, node_refs, value) entries to add.
185
if self.reference_lists:
186
for (key, value, node_refs) in nodes:
187
self.add_node(key, value, node_refs)
189
for (key, value) in nodes:
190
self.add_node(key, value)
192
def _iter_mem_nodes(self):
193
"""Iterate over the nodes held in memory."""
194
if self.reference_lists:
195
for key, (absent, references, value) in self._nodes.iteritems():
197
yield self, key, value, references
199
for key, (absent, references, value) in self._nodes.iteritems():
201
yield self, key, value
203
def _iter_smallest(self, iterators_to_combine):
204
if len(iterators_to_combine) == 1:
205
for value in iterators_to_combine[0]:
209
for iterator in iterators_to_combine:
211
current_values.append(iterator.next())
212
except StopIteration:
213
current_values.append(None)
216
# Decorate candidates with the value to allow 2.4's min to be used.
217
candidates = [(item[1][1], item) for item
218
in enumerate(current_values) if item[1] is not None]
219
if not len(candidates):
221
selected = min(candidates)
222
# undecorate back to (pos, node)
223
selected = selected[1]
224
if last == selected[1][1]:
225
raise errors.BadIndexDuplicateKey(last, self)
226
last = selected[1][1]
227
# Yield, with self as the index
228
yield (self,) + selected[1][1:]
231
current_values[pos] = iterators_to_combine[pos].next()
232
except StopIteration:
233
current_values[pos] = None
235
def _add_key(self, string_key, line, rows):
236
"""Add a key to the current chunk.
238
:param string_key: The key to add.
239
:param line: The fully serialised key and value.
241
if rows[-1].writer is None:
242
# opening a new leaf chunk;
243
for pos, internal_row in enumerate(rows[:-1]):
244
# flesh out any internal nodes that are needed to
245
# preserve the height of the tree
246
if internal_row.writer is None:
248
if internal_row.nodes == 0:
249
length -= _RESERVED_HEADER_BYTES # padded
250
internal_row.writer = chunk_writer.ChunkWriter(length, 0)
251
internal_row.writer.write(_INTERNAL_FLAG)
252
internal_row.writer.write(_INTERNAL_OFFSET +
253
str(rows[pos + 1].nodes) + "\n")
256
if rows[-1].nodes == 0:
257
length -= _RESERVED_HEADER_BYTES # padded
258
rows[-1].writer = chunk_writer.ChunkWriter(length)
259
rows[-1].writer.write(_LEAF_FLAG)
260
if rows[-1].writer.write(line):
261
# this key did not fit in the node:
262
rows[-1].finish_node()
263
key_line = string_key + "\n"
265
for row in reversed(rows[:-1]):
266
# Mark the start of the next node in the node above. If it
267
# doesn't fit then propogate upwards until we find one that
269
if row.writer.write(key_line):
272
# We've found a node that can handle the pointer.
275
# If we reached the current root without being able to mark the
276
# division point, then we need a new root:
279
if 'index' in debug.debug_flags:
280
trace.mutter('Inserting new global row.')
281
new_row = _InternalBuilderRow()
283
rows.insert(0, new_row)
284
# This will be padded, hence the -100
285
new_row.writer = chunk_writer.ChunkWriter(
286
_PAGE_SIZE - _RESERVED_HEADER_BYTES,
288
new_row.writer.write(_INTERNAL_FLAG)
289
new_row.writer.write(_INTERNAL_OFFSET +
290
str(rows[1].nodes - 1) + "\n")
291
new_row.writer.write(key_line)
292
self._add_key(string_key, line, rows)
294
def _write_nodes(self, node_iterator):
295
"""Write node_iterator out as a B+Tree.
297
:param node_iterator: An iterator of sorted nodes. Each node should
298
match the output given by iter_all_entries.
299
:return: A file handle for a temporary file containing a B+Tree for
302
# The index rows - rows[0] is the root, rows[1] is the layer under it
305
# forward sorted by key. In future we may consider topological sorting,
306
# at the cost of table scans for direct lookup, or a second index for
309
# A stack with the number of nodes of each size. 0 is the root node
310
# and must always be 1 (if there are any nodes in the tree).
311
self.row_lengths = []
312
# Loop over all nodes adding them to the bottom row
313
# (rows[-1]). When we finish a chunk in a row,
314
# propogate the key that didn't fit (comes after the chunk) to the
315
# row above, transitively.
316
for node in node_iterator:
318
# First key triggers the first row
319
rows.append(_LeafBuilderRow())
321
# TODO: Flattening the node into a string key and a line should
322
# probably be put into a pyrex function. We can do a quick
323
# iter over all the entries to determine the final length,
324
# and then do a single malloc() rather than lots of
325
# intermediate mallocs as we build everything up.
326
# ATM 3 / 13s are spent flattening nodes (10s is compressing)
327
string_key, line = _btree_serializer._flatten_node(node,
328
self.reference_lists)
329
self._add_key(string_key, line, rows)
330
for row in reversed(rows):
331
pad = (type(row) != _LeafBuilderRow)
332
row.finish_node(pad=pad)
333
result = tempfile.NamedTemporaryFile()
334
lines = [_BTSIGNATURE]
335
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
336
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
337
lines.append(_OPTION_LEN + str(key_count) + '\n')
338
row_lengths = [row.nodes for row in rows]
339
lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, row_lengths)) + '\n')
340
result.writelines(lines)
341
position = sum(map(len, lines))
343
if position > _RESERVED_HEADER_BYTES:
344
raise AssertionError("Could not fit the header in the"
345
" reserved space: %d > %d"
346
% (position, _RESERVED_HEADER_BYTES))
347
# write the rows out:
349
reserved = _RESERVED_HEADER_BYTES # reserved space for first node
352
# copy nodes to the finalised file.
353
# Special case the first node as it may be prefixed
354
node = row.spool.read(_PAGE_SIZE)
355
result.write(node[reserved:])
356
result.write("\x00" * (reserved - position))
357
position = 0 # Only the root row actually has an offset
358
copied_len = osutils.pumpfile(row.spool, result)
359
if copied_len != (row.nodes - 1) * _PAGE_SIZE:
360
if type(row) != _LeafBuilderRow:
361
raise AssertionError("Not enough data copied")
368
"""Finalise the index.
370
:return: A file handle for a temporary file containing the nodes added
373
return self._write_nodes(self.iter_all_entries())[0]
375
def iter_all_entries(self):
376
"""Iterate over all keys within the index
378
:return: An iterable of (index, key, reference_lists, value). There is no
379
defined order for the result iteration - it will be in the most
380
efficient order for the index (in this case dictionary hash order).
382
if 'evil' in debug.debug_flags:
383
trace.mutter_callsite(3,
384
"iter_all_entries scales with size of history.")
385
# Doing serial rather than ordered would be faster; but this shouldn't
386
# be getting called routinely anyway.
387
iterators = [iter(sorted(self._iter_mem_nodes()))]
388
for backing in self._backing_indices:
389
if backing is not None:
390
iterators.append(backing.iter_all_entries())
391
if len(iterators) == 1:
393
return self._iter_smallest(iterators)
395
def iter_entries(self, keys):
396
"""Iterate over keys within the index.
398
:param keys: An iterable providing the keys to be retrieved.
399
:return: An iterable of (index, key, value, reference_lists). There is no
400
defined order for the result iteration - it will be in the most
401
efficient order for the index (keys iteration order in this case).
404
if self.reference_lists:
405
for key in keys.intersection(self._keys):
406
node = self._nodes[key]
408
yield self, key, node[2], node[1]
410
for key in keys.intersection(self._keys):
411
node = self._nodes[key]
413
yield self, key, node[2]
414
keys.difference_update(self._keys)
415
for backing in self._backing_indices:
420
for node in backing.iter_entries(keys):
422
yield (self,) + node[1:]
424
def iter_entries_prefix(self, keys):
425
"""Iterate over keys within the index using prefix matching.
427
Prefix matching is applied within the tuple of a key, not to within
428
the bytestring of each key element. e.g. if you have the keys ('foo',
429
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
430
only the former key is returned.
432
:param keys: An iterable providing the key prefixes to be retrieved.
433
Each key prefix takes the form of a tuple the length of a key, but
434
with the last N elements 'None' rather than a regular bytestring.
435
The first element cannot be 'None'.
436
:return: An iterable as per iter_all_entries, but restricted to the
437
keys with a matching prefix to those supplied. No additional keys
438
will be returned, and every match that is in the index will be
441
# XXX: To much duplication with the GraphIndex class; consider finding
442
# a good place to pull out the actual common logic.
446
for backing in self._backing_indices:
449
for node in backing.iter_entries_prefix(keys):
450
yield (self,) + node[1:]
451
if self._key_length == 1:
455
raise errors.BadIndexKey(key)
456
if len(key) != self._key_length:
457
raise errors.BadIndexKey(key)
459
node = self._nodes[key]
464
if self.reference_lists:
465
yield self, key, node[2], node[1]
467
yield self, key, node[2]
472
raise errors.BadIndexKey(key)
473
if len(key) != self._key_length:
474
raise errors.BadIndexKey(key)
475
# find what it refers to:
476
key_dict = self._nodes_by_key
478
# find the subdict to return
480
while len(elements) and elements[0] is not None:
481
key_dict = key_dict[elements[0]]
484
# a non-existant lookup.
489
key_dict = dicts.pop(-1)
490
# can't be empty or would not exist
491
item, value = key_dict.iteritems().next()
492
if type(value) == dict:
494
dicts.extend(key_dict.itervalues())
497
for value in key_dict.itervalues():
498
yield (self, ) + value
500
yield (self, ) + key_dict
503
"""Return an estimate of the number of keys in this index.
505
For InMemoryGraphIndex the estimate is exact.
507
return len(self._keys) + sum(backing.key_count() for backing in
508
self._backing_indices if backing is not None)
511
"""In memory index's have no known corruption at the moment."""
514
class _LeafNode(object):
515
"""A leaf node for a serialised B+Tree index."""
517
def __init__(self, bytes, key_length, ref_list_length):
518
"""Parse bytes to create a leaf node object."""
519
# splitlines mangles the \r delimiters.. don't use it.
520
self.keys = dict(_btree_serializer._parse_leaf_lines(bytes,
521
key_length, ref_list_length))
524
class _InternalNode(object):
525
"""An internal node for a serialised B+Tree index."""
527
def __init__(self, bytes):
528
"""Parse bytes to create an internal node object."""
529
# splitlines mangles the \r delimiters.. don't use it.
530
self.keys = self._parse_lines(bytes.split('\n'))
532
def _parse_lines(self, lines):
534
self.offset = int(lines[1][7:])
535
for line in lines[2:]:
538
nodes.append(tuple(line.split('\0')))
542
class BTreeGraphIndex(object):
543
"""Access to nodes via the standard GraphIndex interface for B+Tree's.
545
Individual nodes are held in a LRU cache. This holds the root node in
546
memory except when very large walks are done.
549
def __init__(self, transport, name, size):
550
"""Create a B+Tree index object on the index name.
552
:param transport: The transport to read data for the index from.
553
:param name: The file name of the index on transport.
554
:param size: Optional size of the index in bytes. This allows
555
compatibility with the GraphIndex API, as well as ensuring that
556
the initial read (to read the root node header) can be done
557
without over-reading even on empty indices, and on small indices
558
allows single-IO to read the entire index.
560
self._transport = transport
564
self._page_size = transport.recommended_page_size()
565
self._root_node = None
566
# Default max size is 100,000 leave values
567
self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
568
self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE)
569
self._internal_node_cache = lru_cache.LRUCache()
570
self._key_count = None
571
self._row_lengths = None
572
self._row_offsets = None # Start of each row, [-1] is the end
574
def __eq__(self, other):
575
"""Equal when self and other were created with the same parameters."""
577
type(self) == type(other) and
578
self._transport == other._transport and
579
self._name == other._name and
580
self._size == other._size)
582
def __ne__(self, other):
583
return not self.__eq__(other)
585
def _get_root_node(self):
586
if self._root_node is None:
587
# We may not have a root node yet
588
nodes = list(self._read_nodes([0]))
590
self._root_node = nodes[0][1]
591
return self._root_node
593
def _cache_nodes(self, nodes, cache):
594
"""Read nodes and cache them in the lru.
596
The nodes list supplied is sorted and then read from disk, each node
597
being inserted it into the _node_cache.
599
Note: Asking for more nodes than the _node_cache can contain will
600
result in some of the results being immediately discarded, to prevent
601
this an assertion is raised if more nodes are asked for than are
604
:return: A dict of {node_pos: node}
606
if len(nodes) > cache._max_cache:
607
trace.mutter('Requesting %s > %s nodes, not all will be cached',
608
len(nodes), cache._max_cache)
610
for node_pos, node in self._read_nodes(sorted(nodes)):
611
if node_pos == 0: # Special case
612
self._root_node = node
614
cache.add(node_pos, node)
615
found[node_pos] = node
618
def _get_nodes(self, cache, node_indexes):
621
for idx in node_indexes:
622
if idx == 0 and self._root_node is not None:
623
found[0] = self._root_node
626
found[idx] = cache[idx]
629
found.update(self._cache_nodes(needed, cache))
632
def _get_internal_nodes(self, node_indexes):
633
"""Get a node, from cache or disk.
635
After getting it, the node will be cached.
637
return self._get_nodes(self._internal_node_cache, node_indexes)
639
def _get_leaf_nodes(self, node_indexes):
640
"""Get a bunch of nodes, from cache or disk."""
641
found = self._get_nodes(self._leaf_node_cache, node_indexes)
642
if self._leaf_value_cache is not None:
643
for node in found.itervalues():
644
for key, value in node.keys.iteritems():
645
if key in self._leaf_value_cache:
646
# Don't add the rest of the keys, we've seen this node
649
self._leaf_value_cache[key] = value
652
def iter_all_entries(self):
653
"""Iterate over all keys within the index.
655
:return: An iterable of (index, key, value) or (index, key, value, reference_lists).
656
The former tuple is used when there are no reference lists in the
657
index, making the API compatible with simple key:value index types.
658
There is no defined order for the result iteration - it will be in
659
the most efficient order for the index.
661
if 'evil' in debug.debug_flags:
662
trace.mutter_callsite(3,
663
"iter_all_entries scales with size of history.")
664
if not self.key_count():
666
start_of_leaves = self._row_offsets[-2]
667
end_of_leaves = self._row_offsets[-1]
668
needed_nodes = range(start_of_leaves, end_of_leaves)
669
# We iterate strictly in-order so that we can use this function
670
# for spilling index builds to disk.
671
if self.node_ref_lists:
672
for _, node in self._read_nodes(needed_nodes):
673
for key, (value, refs) in sorted(node.keys.items()):
674
yield (self, key, value, refs)
676
for _, node in self._read_nodes(needed_nodes):
677
for key, (value, refs) in sorted(node.keys.items()):
678
yield (self, key, value)
681
def _multi_bisect_right(in_keys, fixed_keys):
682
"""Find the positions where each 'in_key' would fit in fixed_keys.
684
This is equivalent to doing "bisect_right" on each in_key into
687
:param in_keys: A sorted list of keys to match with fixed_keys
688
:param fixed_keys: A sorted list of keys to match against
689
:return: A list of (integer position, [key list]) tuples.
694
# no pointers in the fixed_keys list, which means everything must
696
return [(0, in_keys)]
698
# TODO: Iterating both lists will generally take M + N steps
699
# Bisecting each key will generally take M * log2 N steps.
700
# If we had an efficient way to compare, we could pick the method
701
# based on which has the fewer number of steps.
702
# There is also the argument that bisect_right is a compiled
703
# function, so there is even more to be gained.
704
# iter_steps = len(in_keys) + len(fixed_keys)
705
# bisect_steps = len(in_keys) * math.log(len(fixed_keys), 2)
706
if len(in_keys) == 1: # Bisect will always be faster for M = 1
707
return [(bisect_right(fixed_keys, in_keys[0]), in_keys)]
708
# elif bisect_steps < iter_steps:
710
# for key in in_keys:
711
# offsets.setdefault(bisect_right(fixed_keys, key),
713
# return [(o, offsets[o]) for o in sorted(offsets)]
714
in_keys_iter = iter(in_keys)
715
fixed_keys_iter = enumerate(fixed_keys)
716
cur_in_key = in_keys_iter.next()
717
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
719
class InputDone(Exception): pass
720
class FixedDone(Exception): pass
725
# TODO: Another possibility is that rather than iterating on each side,
726
# we could use a combination of bisecting and iterating. For
727
# example, while cur_in_key < fixed_key, bisect to find its
728
# point, then iterate all matching keys, then bisect (restricted
729
# to only the remainder) for the next one, etc.
732
if cur_in_key < cur_fixed_key:
734
cur_out = (cur_fixed_offset, cur_keys)
735
output.append(cur_out)
736
while cur_in_key < cur_fixed_key:
737
cur_keys.append(cur_in_key)
739
cur_in_key = in_keys_iter.next()
740
except StopIteration:
742
# At this point cur_in_key must be >= cur_fixed_key
743
# step the cur_fixed_key until we pass the cur key, or walk off
745
while cur_in_key >= cur_fixed_key:
747
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
748
except StopIteration:
751
# We consumed all of the input, nothing more to do
754
# There was some input left, but we consumed all of fixed, so we
755
# have to add one more for the tail
756
cur_keys = [cur_in_key]
757
cur_keys.extend(in_keys_iter)
758
cur_out = (len(fixed_keys), cur_keys)
759
output.append(cur_out)
762
def iter_entries(self, keys):
763
"""Iterate over keys within the index.
765
:param keys: An iterable providing the keys to be retrieved.
766
:return: An iterable as per iter_all_entries, but restricted to the
767
keys supplied. No additional keys will be returned, and every
768
key supplied that is in the index will be returned.
770
# 6 seconds spent in miss_torture using the sorted() line.
771
# Even with out of order disk IO it seems faster not to sort it when
772
# large queries are being made.
773
# However, now that we are doing multi-way bisecting, we need the keys
774
# in sorted order anyway. We could change the multi-way code to not
775
# require sorted order. (For example, it bisects for the first node,
776
# does an in-order search until a key comes before the current point,
777
# which it then bisects for, etc.)
778
keys = frozenset(keys)
782
if not self.key_count():
786
if self._leaf_value_cache is None:
790
value = self._leaf_value_cache.get(key, None)
791
if value is not None:
792
# This key is known not to be here, skip it
794
if self.node_ref_lists:
795
yield (self, key, value, refs)
797
yield (self, key, value)
799
needed_keys.append(key)
805
# 6 seconds spent in miss_torture using the sorted() line.
806
# Even with out of order disk IO it seems faster not to sort it when
807
# large queries are being made.
808
needed_keys = sorted(needed_keys)
810
nodes_and_keys = [(0, needed_keys)]
812
for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]):
813
node_indexes = [idx for idx, s_keys in nodes_and_keys]
814
nodes = self._get_internal_nodes(node_indexes)
816
next_nodes_and_keys = []
817
for node_index, sub_keys in nodes_and_keys:
818
node = nodes[node_index]
819
positions = self._multi_bisect_right(sub_keys, node.keys)
820
node_offset = next_row_start + node.offset
821
next_nodes_and_keys.extend([(node_offset + pos, s_keys)
822
for pos, s_keys in positions])
823
nodes_and_keys = next_nodes_and_keys
824
# We should now be at the _LeafNodes
825
node_indexes = [idx for idx, s_keys in nodes_and_keys]
827
# TODO: We may *not* want to always read all the nodes in one
828
# big go. Consider setting a max size on this.
830
nodes = self._get_leaf_nodes(node_indexes)
831
for node_index, sub_keys in nodes_and_keys:
834
node = nodes[node_index]
835
for next_sub_key in sub_keys:
836
if next_sub_key in node.keys:
837
value, refs = node.keys[next_sub_key]
838
if self.node_ref_lists:
839
yield (self, next_sub_key, value, refs)
841
yield (self, next_sub_key, value)
843
def iter_entries_prefix(self, keys):
844
"""Iterate over keys within the index using prefix matching.
846
Prefix matching is applied within the tuple of a key, not to within
847
the bytestring of each key element. e.g. if you have the keys ('foo',
848
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
849
only the former key is returned.
851
WARNING: Note that this method currently causes a full index parse
852
unconditionally (which is reasonably appropriate as it is a means for
853
thunking many small indices into one larger one and still supplies
854
iter_all_entries at the thunk layer).
856
:param keys: An iterable providing the key prefixes to be retrieved.
857
Each key prefix takes the form of a tuple the length of a key, but
858
with the last N elements 'None' rather than a regular bytestring.
859
The first element cannot be 'None'.
860
:return: An iterable as per iter_all_entries, but restricted to the
861
keys with a matching prefix to those supplied. No additional keys
862
will be returned, and every match that is in the index will be
865
keys = sorted(set(keys))
868
# Load if needed to check key lengths
869
if self._key_count is None:
870
self._get_root_node()
871
# TODO: only access nodes that can satisfy the prefixes we are looking
872
# for. For now, to meet API usage (as this function is not used by
873
# current bzrlib) just suck the entire index and iterate in memory.
875
if self.node_ref_lists:
876
if self._key_length == 1:
877
for _1, key, value, refs in self.iter_all_entries():
878
nodes[key] = value, refs
881
for _1, key, value, refs in self.iter_all_entries():
882
key_value = key, value, refs
883
# For a key of (foo, bar, baz) create
884
# _nodes_by_key[foo][bar][baz] = key_value
885
key_dict = nodes_by_key
886
for subkey in key[:-1]:
887
key_dict = key_dict.setdefault(subkey, {})
888
key_dict[key[-1]] = key_value
890
if self._key_length == 1:
891
for _1, key, value in self.iter_all_entries():
895
for _1, key, value in self.iter_all_entries():
896
key_value = key, value
897
# For a key of (foo, bar, baz) create
898
# _nodes_by_key[foo][bar][baz] = key_value
899
key_dict = nodes_by_key
900
for subkey in key[:-1]:
901
key_dict = key_dict.setdefault(subkey, {})
902
key_dict[key[-1]] = key_value
903
if self._key_length == 1:
907
raise errors.BadIndexKey(key)
908
if len(key) != self._key_length:
909
raise errors.BadIndexKey(key)
911
if self.node_ref_lists:
912
value, node_refs = nodes[key]
913
yield self, key, value, node_refs
915
yield self, key, nodes[key]
922
raise errors.BadIndexKey(key)
923
if len(key) != self._key_length:
924
raise errors.BadIndexKey(key)
925
# find what it refers to:
926
key_dict = nodes_by_key
928
# find the subdict whose contents should be returned.
930
while len(elements) and elements[0] is not None:
931
key_dict = key_dict[elements[0]]
934
# a non-existant lookup.
939
key_dict = dicts.pop(-1)
940
# can't be empty or would not exist
941
item, value = key_dict.iteritems().next()
942
if type(value) == dict:
944
dicts.extend(key_dict.itervalues())
947
for value in key_dict.itervalues():
948
# each value is the key:value:node refs tuple
950
yield (self, ) + value
952
# the last thing looked up was a terminal element
953
yield (self, ) + key_dict
956
"""Return an estimate of the number of keys in this index.
958
For BTreeGraphIndex the estimate is exact as it is contained in the
961
if self._key_count is None:
962
self._get_root_node()
963
return self._key_count
965
def _parse_header_from_bytes(self, bytes):
966
"""Parse the header from a region of bytes.
968
:param bytes: The data to parse.
969
:return: An offset, data tuple such as readv yields, for the unparsed
970
data. (which may be of length 0).
972
signature = bytes[0:len(self._signature())]
973
if not signature == self._signature():
974
raise errors.BadIndexFormatSignature(self._name, BTreeGraphIndex)
975
lines = bytes[len(self._signature()):].splitlines()
976
options_line = lines[0]
977
if not options_line.startswith(_OPTION_NODE_REFS):
978
raise errors.BadIndexOptions(self)
980
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
982
raise errors.BadIndexOptions(self)
983
options_line = lines[1]
984
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
985
raise errors.BadIndexOptions(self)
987
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
989
raise errors.BadIndexOptions(self)
990
options_line = lines[2]
991
if not options_line.startswith(_OPTION_LEN):
992
raise errors.BadIndexOptions(self)
994
self._key_count = int(options_line[len(_OPTION_LEN):])
996
raise errors.BadIndexOptions(self)
997
options_line = lines[3]
998
if not options_line.startswith(_OPTION_ROW_LENGTHS):
999
raise errors.BadIndexOptions(self)
1001
self._row_lengths = map(int, [length for length in
1002
options_line[len(_OPTION_ROW_LENGTHS):].split(',')
1005
raise errors.BadIndexOptions(self)
1008
for row in self._row_lengths:
1009
offsets.append(row_offset)
1011
offsets.append(row_offset)
1012
self._row_offsets = offsets
1014
# calculate the bytes we have processed
1015
header_end = (len(signature) + sum(map(len, lines[0:4])) + 4)
1016
return header_end, bytes[header_end:]
1018
def _read_nodes(self, nodes):
1019
"""Read some nodes from disk into the LRU cache.
1021
This performs a readv to get the node data into memory, and parses each
1022
node, the yields it to the caller. The nodes are requested in the
1023
supplied order. If possible doing sort() on the list before requesting
1024
a read may improve performance.
1026
:param nodes: The nodes to read. 0 - first node, 1 - second node etc.
1031
offset = index * _PAGE_SIZE
1034
# Root node - special case
1036
size = min(_PAGE_SIZE, self._size)
1038
stream = self._transport.get(self._name)
1039
start = stream.read(_PAGE_SIZE)
1040
# Avoid doing this again
1041
self._size = len(start)
1042
size = min(_PAGE_SIZE, self._size)
1044
size = min(size, self._size - offset)
1045
ranges.append((offset, size))
1048
if self._file is None:
1049
data_ranges = self._transport.readv(self._name, ranges)
1052
for offset, size in ranges:
1053
self._file.seek(offset)
1054
data_ranges.append((offset, self._file.read(size)))
1055
for offset, data in data_ranges:
1057
# extract the header
1058
offset, data = self._parse_header_from_bytes(data)
1061
bytes = zlib.decompress(data)
1062
if bytes.startswith(_LEAF_FLAG):
1063
node = _LeafNode(bytes, self._key_length, self.node_ref_lists)
1064
elif bytes.startswith(_INTERNAL_FLAG):
1065
node = _InternalNode(bytes)
1067
raise AssertionError("Unknown node type for %r" % bytes)
1068
yield offset / _PAGE_SIZE, node
1070
def _signature(self):
1071
"""The file signature for this index type."""
1075
"""Validate that everything in the index can be accessed."""
1076
# just read and parse every node.
1077
self._get_root_node()
1078
if len(self._row_lengths) > 1:
1079
start_node = self._row_offsets[1]
1081
# We shouldn't be reading anything anyway
1083
node_end = self._row_offsets[-1]
1084
for node in self._read_nodes(range(start_node, node_end)):
1089
from bzrlib import _btree_serializer_c as _btree_serializer
1091
from bzrlib import _btree_serializer_py as _btree_serializer