1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
from bisect import bisect_right
23
from copy import deepcopy
38
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
39
from bzrlib.transport import get_transport
42
_BTSIGNATURE = "B+Tree Graph Index 2\n"
43
_OPTION_ROW_LENGTHS = "row_lengths="
44
_LEAF_FLAG = "type=leaf\n"
45
_INTERNAL_FLAG = "type=internal\n"
46
_INTERNAL_OFFSET = "offset="
48
_RESERVED_HEADER_BYTES = 120
51
# 4K per page: 4MB - 1000 entries
52
_NODE_CACHE_SIZE = 1000
55
class _BuilderRow(object):
56
"""The stored state accumulated while writing out a row in the index.
58
:ivar spool: A temporary file used to accumulate nodes for this row
60
:ivar nodes: The count of nodes emitted so far.
64
"""Create a _BuilderRow."""
66
self.spool = tempfile.TemporaryFile()
69
def finish_node(self, pad=True):
70
byte_lines, _, padding = self.writer.finish()
73
self.spool.write("\x00" * _RESERVED_HEADER_BYTES)
75
if not pad and padding:
77
skipped_bytes = padding
78
self.spool.writelines(byte_lines)
79
remainder = (self.spool.tell() + skipped_bytes) % _PAGE_SIZE
81
raise AssertionError("incorrect node length: %d, %d"
82
% (self.spool.tell(), remainder))
87
class _InternalBuilderRow(_BuilderRow):
88
"""The stored state accumulated while writing out internal rows."""
90
def finish_node(self, pad=True):
92
raise AssertionError("Must pad internal nodes only.")
93
_BuilderRow.finish_node(self)
96
class _LeafBuilderRow(_BuilderRow):
97
"""The stored state accumulated while writing out a leaf rows."""
100
class BTreeBuilder(index.GraphIndexBuilder):
101
"""A Builder for B+Tree based Graph indices.
103
The resulting graph has the structure:
105
_SIGNATURE OPTIONS NODES
106
_SIGNATURE := 'B+Tree Graph Index 1' NEWLINE
107
OPTIONS := REF_LISTS KEY_ELEMENTS LENGTH
108
REF_LISTS := 'node_ref_lists=' DIGITS NEWLINE
109
KEY_ELEMENTS := 'key_elements=' DIGITS NEWLINE
110
LENGTH := 'len=' DIGITS NEWLINE
111
ROW_LENGTHS := 'row_lengths' DIGITS (COMMA DIGITS)*
112
NODES := NODE_COMPRESSED*
113
NODE_COMPRESSED:= COMPRESSED_BYTES{4096}
114
NODE_RAW := INTERNAL | LEAF
115
INTERNAL := INTERNAL_FLAG POINTERS
116
LEAF := LEAF_FLAG ROWS
117
KEY_ELEMENT := Not-whitespace-utf8
118
KEY := KEY_ELEMENT (NULL KEY_ELEMENT)*
120
ROW := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE
122
REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1}
123
REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)?
125
VALUE := no-newline-no-null-bytes
128
def __init__(self, reference_lists=0, key_elements=1, spill_at=100000):
129
"""See GraphIndexBuilder.__init__.
131
:param spill_at: Optional parameter controlling the maximum number
132
of nodes that BTreeBuilder will hold in memory.
134
index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists,
135
key_elements=key_elements)
136
self._spill_at = spill_at
137
self._backing_indices = []
138
# A map of {key: (node_refs, value)}
140
# Indicate it hasn't been built yet
141
self._nodes_by_key = None
143
def add_node(self, key, value, references=()):
144
"""Add a node to the index.
146
If adding the node causes the builder to reach its spill_at threshold,
147
disk spilling will be triggered.
149
:param key: The key. keys are non-empty tuples containing
150
as many whitespace-free utf8 bytestrings as the key length
151
defined for this index.
152
:param references: An iterable of iterables of keys. Each is a
153
reference to another key.
154
:param value: The value to associate with the key. It may be any
155
bytes as long as it does not contain \0 or \n.
157
# we don't care about absent_references
158
node_refs, _ = self._check_key_ref_value(key, references, value)
159
if key in self._nodes:
160
raise errors.BadIndexDuplicateKey(key, self)
161
self._nodes[key] = (node_refs, value)
163
if self._nodes_by_key is not None and self._key_length > 1:
164
self._update_nodes_by_key(key, value, node_refs)
165
if len(self._keys) < self._spill_at:
167
self._spill_mem_keys_to_disk()
169
def _spill_mem_keys_to_disk(self):
170
"""Write the in memory keys down to disk to cap memory consumption.
172
If we already have some keys written to disk, we will combine them so
173
as to preserve the sorted order. The algorithm for combining uses
174
powers of two. So on the first spill, write all mem nodes into a
175
single index. On the second spill, combine the mem nodes with the nodes
176
on disk to create a 2x sized disk index and get rid of the first index.
177
On the third spill, create a single new disk index, which will contain
178
the mem nodes, and preserve the existing 2x sized index. On the fourth,
179
combine mem with the first and second indexes, creating a new one of
180
size 4x. On the fifth create a single new one, etc.
182
iterators_to_combine = [self._iter_mem_nodes()]
184
for pos, backing in enumerate(self._backing_indices):
188
iterators_to_combine.append(backing.iter_all_entries())
189
backing_pos = pos + 1
190
new_backing_file, size = \
191
self._write_nodes(self._iter_smallest(iterators_to_combine))
192
dir_path, base_name = osutils.split(new_backing_file.name)
193
# Note: The transport here isn't strictly needed, because we will use
194
# direct access to the new_backing._file object
195
new_backing = BTreeGraphIndex(get_transport(dir_path),
197
# GC will clean up the file
198
new_backing._file = new_backing_file
199
if len(self._backing_indices) == backing_pos:
200
self._backing_indices.append(None)
201
self._backing_indices[backing_pos] = new_backing
202
for pos in range(backing_pos):
203
self._backing_indices[pos] = None
206
self._nodes_by_key = None
208
def add_nodes(self, nodes):
209
"""Add nodes to the index.
211
:param nodes: An iterable of (key, node_refs, value) entries to add.
213
if self.reference_lists:
214
for (key, value, node_refs) in nodes:
215
self.add_node(key, value, node_refs)
217
for (key, value) in nodes:
218
self.add_node(key, value)
220
def _iter_mem_nodes(self):
221
"""Iterate over the nodes held in memory."""
223
if self.reference_lists:
224
for key in sorted(nodes):
225
references, value = nodes[key]
226
yield self, key, value, references
228
for key in sorted(nodes):
229
references, value = nodes[key]
230
yield self, key, value
232
def _iter_smallest(self, iterators_to_combine):
233
if len(iterators_to_combine) == 1:
234
for value in iterators_to_combine[0]:
238
for iterator in iterators_to_combine:
240
current_values.append(iterator.next())
241
except StopIteration:
242
current_values.append(None)
245
# Decorate candidates with the value to allow 2.4's min to be used.
246
candidates = [(item[1][1], item) for item
247
in enumerate(current_values) if item[1] is not None]
248
if not len(candidates):
250
selected = min(candidates)
251
# undecorate back to (pos, node)
252
selected = selected[1]
253
if last == selected[1][1]:
254
raise errors.BadIndexDuplicateKey(last, self)
255
last = selected[1][1]
256
# Yield, with self as the index
257
yield (self,) + selected[1][1:]
260
current_values[pos] = iterators_to_combine[pos].next()
261
except StopIteration:
262
current_values[pos] = None
264
def _add_key(self, string_key, line, rows):
265
"""Add a key to the current chunk.
267
:param string_key: The key to add.
268
:param line: The fully serialised key and value.
270
if rows[-1].writer is None:
271
# opening a new leaf chunk;
272
for pos, internal_row in enumerate(rows[:-1]):
273
# flesh out any internal nodes that are needed to
274
# preserve the height of the tree
275
if internal_row.writer is None:
277
if internal_row.nodes == 0:
278
length -= _RESERVED_HEADER_BYTES # padded
279
internal_row.writer = chunk_writer.ChunkWriter(length, 0)
280
internal_row.writer.write(_INTERNAL_FLAG)
281
internal_row.writer.write(_INTERNAL_OFFSET +
282
str(rows[pos + 1].nodes) + "\n")
285
if rows[-1].nodes == 0:
286
length -= _RESERVED_HEADER_BYTES # padded
287
rows[-1].writer = chunk_writer.ChunkWriter(length)
288
rows[-1].writer.write(_LEAF_FLAG)
289
if rows[-1].writer.write(line):
290
# this key did not fit in the node:
291
rows[-1].finish_node()
292
key_line = string_key + "\n"
294
for row in reversed(rows[:-1]):
295
# Mark the start of the next node in the node above. If it
296
# doesn't fit then propogate upwards until we find one that
298
if row.writer.write(key_line):
301
# We've found a node that can handle the pointer.
304
# If we reached the current root without being able to mark the
305
# division point, then we need a new root:
308
if 'index' in debug.debug_flags:
309
trace.mutter('Inserting new global row.')
310
new_row = _InternalBuilderRow()
312
rows.insert(0, new_row)
313
# This will be padded, hence the -100
314
new_row.writer = chunk_writer.ChunkWriter(
315
_PAGE_SIZE - _RESERVED_HEADER_BYTES,
317
new_row.writer.write(_INTERNAL_FLAG)
318
new_row.writer.write(_INTERNAL_OFFSET +
319
str(rows[1].nodes - 1) + "\n")
320
new_row.writer.write(key_line)
321
self._add_key(string_key, line, rows)
323
def _write_nodes(self, node_iterator):
324
"""Write node_iterator out as a B+Tree.
326
:param node_iterator: An iterator of sorted nodes. Each node should
327
match the output given by iter_all_entries.
328
:return: A file handle for a temporary file containing a B+Tree for
331
# The index rows - rows[0] is the root, rows[1] is the layer under it
334
# forward sorted by key. In future we may consider topological sorting,
335
# at the cost of table scans for direct lookup, or a second index for
338
# A stack with the number of nodes of each size. 0 is the root node
339
# and must always be 1 (if there are any nodes in the tree).
340
self.row_lengths = []
341
# Loop over all nodes adding them to the bottom row
342
# (rows[-1]). When we finish a chunk in a row,
343
# propogate the key that didn't fit (comes after the chunk) to the
344
# row above, transitively.
345
for node in node_iterator:
347
# First key triggers the first row
348
rows.append(_LeafBuilderRow())
350
string_key, line = _btree_serializer._flatten_node(node,
351
self.reference_lists)
352
self._add_key(string_key, line, rows)
353
for row in reversed(rows):
354
pad = (type(row) != _LeafBuilderRow)
355
row.finish_node(pad=pad)
356
result = tempfile.NamedTemporaryFile()
357
lines = [_BTSIGNATURE]
358
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
359
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
360
lines.append(_OPTION_LEN + str(key_count) + '\n')
361
row_lengths = [row.nodes for row in rows]
362
lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, row_lengths)) + '\n')
363
result.writelines(lines)
364
position = sum(map(len, lines))
366
if position > _RESERVED_HEADER_BYTES:
367
raise AssertionError("Could not fit the header in the"
368
" reserved space: %d > %d"
369
% (position, _RESERVED_HEADER_BYTES))
370
# write the rows out:
372
reserved = _RESERVED_HEADER_BYTES # reserved space for first node
375
# copy nodes to the finalised file.
376
# Special case the first node as it may be prefixed
377
node = row.spool.read(_PAGE_SIZE)
378
result.write(node[reserved:])
379
result.write("\x00" * (reserved - position))
380
position = 0 # Only the root row actually has an offset
381
copied_len = osutils.pumpfile(row.spool, result)
382
if copied_len != (row.nodes - 1) * _PAGE_SIZE:
383
if type(row) != _LeafBuilderRow:
384
raise AssertionError("Incorrect amount of data copied"
385
" expected: %d, got: %d"
386
% ((row.nodes - 1) * _PAGE_SIZE,
394
"""Finalise the index.
396
:return: A file handle for a temporary file containing the nodes added
399
return self._write_nodes(self.iter_all_entries())[0]
401
def iter_all_entries(self):
402
"""Iterate over all keys within the index
404
:return: An iterable of (index, key, reference_lists, value). There is no
405
defined order for the result iteration - it will be in the most
406
efficient order for the index (in this case dictionary hash order).
408
if 'evil' in debug.debug_flags:
409
trace.mutter_callsite(3,
410
"iter_all_entries scales with size of history.")
411
# Doing serial rather than ordered would be faster; but this shouldn't
412
# be getting called routinely anyway.
413
iterators = [self._iter_mem_nodes()]
414
for backing in self._backing_indices:
415
if backing is not None:
416
iterators.append(backing.iter_all_entries())
417
if len(iterators) == 1:
419
return self._iter_smallest(iterators)
421
def iter_entries(self, keys):
422
"""Iterate over keys within the index.
424
:param keys: An iterable providing the keys to be retrieved.
425
:return: An iterable of (index, key, value, reference_lists). There is no
426
defined order for the result iteration - it will be in the most
427
efficient order for the index (keys iteration order in this case).
430
if self.reference_lists:
431
for key in keys.intersection(self._keys):
432
node = self._nodes[key]
433
yield self, key, node[1], node[0]
435
for key in keys.intersection(self._keys):
436
node = self._nodes[key]
437
yield self, key, node[1]
438
keys.difference_update(self._keys)
439
for backing in self._backing_indices:
444
for node in backing.iter_entries(keys):
446
yield (self,) + node[1:]
448
def iter_entries_prefix(self, keys):
449
"""Iterate over keys within the index using prefix matching.
451
Prefix matching is applied within the tuple of a key, not to within
452
the bytestring of each key element. e.g. if you have the keys ('foo',
453
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
454
only the former key is returned.
456
:param keys: An iterable providing the key prefixes to be retrieved.
457
Each key prefix takes the form of a tuple the length of a key, but
458
with the last N elements 'None' rather than a regular bytestring.
459
The first element cannot be 'None'.
460
:return: An iterable as per iter_all_entries, but restricted to the
461
keys with a matching prefix to those supplied. No additional keys
462
will be returned, and every match that is in the index will be
465
# XXX: To much duplication with the GraphIndex class; consider finding
466
# a good place to pull out the actual common logic.
470
for backing in self._backing_indices:
473
for node in backing.iter_entries_prefix(keys):
474
yield (self,) + node[1:]
475
if self._key_length == 1:
479
raise errors.BadIndexKey(key)
480
if len(key) != self._key_length:
481
raise errors.BadIndexKey(key)
483
node = self._nodes[key]
486
if self.reference_lists:
487
yield self, key, node[1], node[0]
489
yield self, key, node[1]
494
raise errors.BadIndexKey(key)
495
if len(key) != self._key_length:
496
raise errors.BadIndexKey(key)
497
# find what it refers to:
498
key_dict = self._get_nodes_by_key()
500
# find the subdict to return
502
while len(elements) and elements[0] is not None:
503
key_dict = key_dict[elements[0]]
506
# a non-existant lookup.
511
key_dict = dicts.pop(-1)
512
# can't be empty or would not exist
513
item, value = key_dict.iteritems().next()
514
if type(value) == dict:
516
dicts.extend(key_dict.itervalues())
519
for value in key_dict.itervalues():
520
yield (self, ) + value
522
yield (self, ) + key_dict
524
def _get_nodes_by_key(self):
525
if self._nodes_by_key is None:
527
if self.reference_lists:
528
for key, (references, value) in self._nodes.iteritems():
529
key_dict = nodes_by_key
530
for subkey in key[:-1]:
531
key_dict = key_dict.setdefault(subkey, {})
532
key_dict[key[-1]] = key, value, references
534
for key, (references, value) in self._nodes.iteritems():
535
key_dict = nodes_by_key
536
for subkey in key[:-1]:
537
key_dict = key_dict.setdefault(subkey, {})
538
key_dict[key[-1]] = key, value
539
self._nodes_by_key = nodes_by_key
540
return self._nodes_by_key
543
"""Return an estimate of the number of keys in this index.
545
For InMemoryGraphIndex the estimate is exact.
547
return len(self._keys) + sum(backing.key_count() for backing in
548
self._backing_indices if backing is not None)
551
"""In memory index's have no known corruption at the moment."""
554
class _LeafNode(object):
555
"""A leaf node for a serialised B+Tree index."""
557
def __init__(self, bytes, key_length, ref_list_length):
558
"""Parse bytes to create a leaf node object."""
559
# splitlines mangles the \r delimiters.. don't use it.
560
self.keys = dict(_btree_serializer._parse_leaf_lines(bytes,
561
key_length, ref_list_length))
564
class _InternalNode(object):
565
"""An internal node for a serialised B+Tree index."""
567
def __init__(self, bytes):
568
"""Parse bytes to create an internal node object."""
569
# splitlines mangles the \r delimiters.. don't use it.
570
self.keys = self._parse_lines(bytes.split('\n'))
572
def _parse_lines(self, lines):
574
self.offset = int(lines[1][7:])
575
for line in lines[2:]:
578
nodes.append(tuple(line.split('\0')))
582
class BTreeGraphIndex(object):
583
"""Access to nodes via the standard GraphIndex interface for B+Tree's.
585
Individual nodes are held in a LRU cache. This holds the root node in
586
memory except when very large walks are done.
589
def __init__(self, transport, name, size):
590
"""Create a B+Tree index object on the index name.
592
:param transport: The transport to read data for the index from.
593
:param name: The file name of the index on transport.
594
:param size: Optional size of the index in bytes. This allows
595
compatibility with the GraphIndex API, as well as ensuring that
596
the initial read (to read the root node header) can be done
597
without over-reading even on empty indices, and on small indices
598
allows single-IO to read the entire index.
600
self._transport = transport
604
self._recommended_pages = self._compute_recommended_pages()
605
self._root_node = None
606
# Default max size is 100,000 leave values
607
self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
608
self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE)
609
self._internal_node_cache = lru_cache.LRUCache()
610
self._key_count = None
611
self._row_lengths = None
612
self._row_offsets = None # Start of each row, [-1] is the end
614
def __eq__(self, other):
615
"""Equal when self and other were created with the same parameters."""
617
type(self) == type(other) and
618
self._transport == other._transport and
619
self._name == other._name and
620
self._size == other._size)
622
def __ne__(self, other):
623
return not self.__eq__(other)
625
def _get_and_cache_nodes(self, nodes):
626
"""Read nodes and cache them in the lru.
628
The nodes list supplied is sorted and then read from disk, each node
629
being inserted it into the _node_cache.
631
Note: Asking for more nodes than the _node_cache can contain will
632
result in some of the results being immediately discarded, to prevent
633
this an assertion is raised if more nodes are asked for than are
636
:return: A dict of {node_pos: node}
639
start_of_leaves = None
640
for node_pos, node in self._read_nodes(sorted(nodes)):
641
if node_pos == 0: # Special case
642
self._root_node = node
644
if start_of_leaves is None:
645
start_of_leaves = self._row_offsets[-2]
646
if node_pos < start_of_leaves:
647
self._internal_node_cache.add(node_pos, node)
649
self._leaf_node_cache.add(node_pos, node)
650
found[node_pos] = node
653
def _compute_recommended_pages(self):
654
"""Given the transport's recommended_page_size, determine num pages."""
655
recommended_read = self._transport.recommended_page_size()
656
recommended_pages = int(math.ceil(recommended_read /
658
return recommended_pages
660
def _compute_num_pages(self):
661
"""Determine the offset to the last page in this index."""
662
if self._size is None:
663
raise AssertionError('_compute_num_pages should not be called'
664
' when self._size is None')
665
if self._root_node is not None:
666
# This is the number of pages as defined by the header
667
return self._row_offsets[-1]
668
# This is the number of pages as defined by the size of the index. They
669
# should be indentical.
670
num_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
673
def _expand_nodes(self, node_indexes):
674
"""Find extra nodes to download.
676
The idea is that we always want to make big-enough requests (like 64kB
677
for http), so that we don't waste round trips. So given the entries
678
that we already have cached, and the new nodes being downloaded, figure
679
out what other pages we might want to read.
681
:param node_indexes: The nodes that have been requested to read.
682
:return: A new list of nodes to download
684
if 'index' in debug.debug_flags:
685
trace.mutter('expanding: %s\tnodes: %s', self._name, node_indexes)
687
if len(node_indexes) >= self._recommended_pages:
688
# Don't add more, we are already requesting more than enough
689
if 'index' in debug.debug_flags:
690
trace.mutter(' not expanding large request (%s >= %s)',
691
len(node_indexes), self._recommended_pages)
693
if self._size is None:
694
# Don't try anything, because we don't know where the file ends
695
if 'index' in debug.debug_flags:
696
trace.mutter(' not expanding without knowing index size')
698
num_pages = self._compute_num_pages()
699
# The idea here is to get nodes "next to" the ones we are already
701
cached_keys = self._get_cached_keys()
703
# If reading recommended_pages would read the rest of the index, just
705
if num_pages - len(cached_keys) <= self._recommended_pages:
706
# Read whatever is left
708
expanded = [x for x in xrange(num_pages)
709
if x not in cached_keys]
711
expanded = range(num_pages)
712
if 'index' in debug.debug_flags:
713
trace.mutter(' reading all unread pages: %s', expanded)
716
if self._root_node is None:
717
# ATM on the first read of the root node of a large index, we don't
718
# bother pre-reading any other pages. This is because the
719
# likelyhood of actually reading interesting pages is very low.
720
# See doc/developers/btree_index_prefetch.txt for a discussion, and
721
# a possible implementation when we are guessing that the second
722
# layer index is small
723
final_nodes = node_indexes
725
tree_depth = len(self._row_lengths)
726
if len(cached_keys) < tree_depth and len(node_indexes) == 1:
727
# We haven't read enough to justify expansion
728
# If we are only going to read the root node, and 1 leaf node,
729
# then it isn't worth expanding our request. Once we've read at
730
# least 2 nodes, then we are probably doing a search, and we
731
# start expanding our requests.
732
if 'index' in debug.debug_flags:
733
trace.mutter(' not expanding on first reads')
736
# Expand requests to neighbors until we have at least
737
# recommended_pages to request. We only want to expand requests
738
# within a given layer. We cheat a little bit and assume all
739
# requests will be in the same layer. This is true given the
740
# current design, but if it changes this algorithm may perform
742
final_nodes = set(node_indexes)
744
new_tips = set(final_nodes)
745
while len(final_nodes) < self._recommended_pages and new_tips:
749
first, end = self._find_layer_first_and_end(pos)
752
and previous not in cached_keys
753
and previous not in final_nodes
754
and previous >= first):
755
next_tips.add(previous)
757
if (after < num_pages
758
and after not in cached_keys
759
and after not in final_nodes
762
# This would keep us from going bigger than
763
# recommended_pages by only expanding the first nodes.
764
# However, if we are making a 'wide' request, it is
765
# reasonable to expand all points equally.
766
# if len(final_nodes) > recommended_pages:
768
final_nodes.update(next_tips)
771
final_nodes = sorted(final_nodes)
772
if 'index' in debug.debug_flags:
773
trace.mutter('expanded: %s', final_nodes)
776
def _find_layer_first_and_end(self, offset):
777
"""Find the start/stop nodes for the layer corresponding to offset.
779
:return: (first, end)
780
first is the first node in this layer
781
end is the first node of the next layer
784
for roffset in self._row_offsets:
791
def _get_cached_keys(self):
792
"""Determine what nodes we already have cached."""
793
cached_keys = set(self._internal_node_cache.keys())
794
cached_keys.update(self._leaf_node_cache.keys())
795
if self._root_node is not None:
799
def _get_root_node(self):
800
if self._root_node is None:
801
# We may not have a root node yet
802
self._get_internal_nodes([0])
803
return self._root_node
805
def _get_nodes(self, cache, node_indexes):
808
for idx in node_indexes:
809
if idx == 0 and self._root_node is not None:
810
found[0] = self._root_node
813
found[idx] = cache[idx]
818
needed = self._expand_nodes(needed)
819
found.update(self._get_and_cache_nodes(needed))
822
def _get_internal_nodes(self, node_indexes):
823
"""Get a node, from cache or disk.
825
After getting it, the node will be cached.
827
return self._get_nodes(self._internal_node_cache, node_indexes)
829
def _get_leaf_nodes(self, node_indexes):
830
"""Get a bunch of nodes, from cache or disk."""
831
found = self._get_nodes(self._leaf_node_cache, node_indexes)
832
if self._leaf_value_cache is not None:
833
for node in found.itervalues():
834
for key, value in node.keys.iteritems():
835
if key in self._leaf_value_cache:
836
# Don't add the rest of the keys, we've seen this node
839
self._leaf_value_cache[key] = value
842
def iter_all_entries(self):
843
"""Iterate over all keys within the index.
845
:return: An iterable of (index, key, value) or (index, key, value, reference_lists).
846
The former tuple is used when there are no reference lists in the
847
index, making the API compatible with simple key:value index types.
848
There is no defined order for the result iteration - it will be in
849
the most efficient order for the index.
851
if 'evil' in debug.debug_flags:
852
trace.mutter_callsite(3,
853
"iter_all_entries scales with size of history.")
854
if not self.key_count():
856
start_of_leaves = self._row_offsets[-2]
857
end_of_leaves = self._row_offsets[-1]
858
needed_nodes = range(start_of_leaves, end_of_leaves)
859
# We iterate strictly in-order so that we can use this function
860
# for spilling index builds to disk.
861
if self.node_ref_lists:
862
for _, node in self._read_nodes(needed_nodes):
863
for key, (value, refs) in sorted(node.keys.items()):
864
yield (self, key, value, refs)
866
for _, node in self._read_nodes(needed_nodes):
867
for key, (value, refs) in sorted(node.keys.items()):
868
yield (self, key, value)
871
def _multi_bisect_right(in_keys, fixed_keys):
872
"""Find the positions where each 'in_key' would fit in fixed_keys.
874
This is equivalent to doing "bisect_right" on each in_key into
877
:param in_keys: A sorted list of keys to match with fixed_keys
878
:param fixed_keys: A sorted list of keys to match against
879
:return: A list of (integer position, [key list]) tuples.
884
# no pointers in the fixed_keys list, which means everything must
886
return [(0, in_keys)]
888
# TODO: Iterating both lists will generally take M + N steps
889
# Bisecting each key will generally take M * log2 N steps.
890
# If we had an efficient way to compare, we could pick the method
891
# based on which has the fewer number of steps.
892
# There is also the argument that bisect_right is a compiled
893
# function, so there is even more to be gained.
894
# iter_steps = len(in_keys) + len(fixed_keys)
895
# bisect_steps = len(in_keys) * math.log(len(fixed_keys), 2)
896
if len(in_keys) == 1: # Bisect will always be faster for M = 1
897
return [(bisect_right(fixed_keys, in_keys[0]), in_keys)]
898
# elif bisect_steps < iter_steps:
900
# for key in in_keys:
901
# offsets.setdefault(bisect_right(fixed_keys, key),
903
# return [(o, offsets[o]) for o in sorted(offsets)]
904
in_keys_iter = iter(in_keys)
905
fixed_keys_iter = enumerate(fixed_keys)
906
cur_in_key = in_keys_iter.next()
907
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
909
class InputDone(Exception): pass
910
class FixedDone(Exception): pass
915
# TODO: Another possibility is that rather than iterating on each side,
916
# we could use a combination of bisecting and iterating. For
917
# example, while cur_in_key < fixed_key, bisect to find its
918
# point, then iterate all matching keys, then bisect (restricted
919
# to only the remainder) for the next one, etc.
922
if cur_in_key < cur_fixed_key:
924
cur_out = (cur_fixed_offset, cur_keys)
925
output.append(cur_out)
926
while cur_in_key < cur_fixed_key:
927
cur_keys.append(cur_in_key)
929
cur_in_key = in_keys_iter.next()
930
except StopIteration:
932
# At this point cur_in_key must be >= cur_fixed_key
933
# step the cur_fixed_key until we pass the cur key, or walk off
935
while cur_in_key >= cur_fixed_key:
937
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
938
except StopIteration:
941
# We consumed all of the input, nothing more to do
944
# There was some input left, but we consumed all of fixed, so we
945
# have to add one more for the tail
946
cur_keys = [cur_in_key]
947
cur_keys.extend(in_keys_iter)
948
cur_out = (len(fixed_keys), cur_keys)
949
output.append(cur_out)
952
def iter_entries(self, keys):
953
"""Iterate over keys within the index.
955
:param keys: An iterable providing the keys to be retrieved.
956
:return: An iterable as per iter_all_entries, but restricted to the
957
keys supplied. No additional keys will be returned, and every
958
key supplied that is in the index will be returned.
960
# 6 seconds spent in miss_torture using the sorted() line.
961
# Even with out of order disk IO it seems faster not to sort it when
962
# large queries are being made.
963
# However, now that we are doing multi-way bisecting, we need the keys
964
# in sorted order anyway. We could change the multi-way code to not
965
# require sorted order. (For example, it bisects for the first node,
966
# does an in-order search until a key comes before the current point,
967
# which it then bisects for, etc.)
968
keys = frozenset(keys)
972
if not self.key_count():
976
if self._leaf_value_cache is None:
980
value = self._leaf_value_cache.get(key, None)
981
if value is not None:
982
# This key is known not to be here, skip it
984
if self.node_ref_lists:
985
yield (self, key, value, refs)
987
yield (self, key, value)
989
needed_keys.append(key)
995
# 6 seconds spent in miss_torture using the sorted() line.
996
# Even with out of order disk IO it seems faster not to sort it when
997
# large queries are being made.
998
needed_keys = sorted(needed_keys)
1000
nodes_and_keys = [(0, needed_keys)]
1002
for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]):
1003
node_indexes = [idx for idx, s_keys in nodes_and_keys]
1004
nodes = self._get_internal_nodes(node_indexes)
1006
next_nodes_and_keys = []
1007
for node_index, sub_keys in nodes_and_keys:
1008
node = nodes[node_index]
1009
positions = self._multi_bisect_right(sub_keys, node.keys)
1010
node_offset = next_row_start + node.offset
1011
next_nodes_and_keys.extend([(node_offset + pos, s_keys)
1012
for pos, s_keys in positions])
1013
nodes_and_keys = next_nodes_and_keys
1014
# We should now be at the _LeafNodes
1015
node_indexes = [idx for idx, s_keys in nodes_and_keys]
1017
# TODO: We may *not* want to always read all the nodes in one
1018
# big go. Consider setting a max size on this.
1020
nodes = self._get_leaf_nodes(node_indexes)
1021
for node_index, sub_keys in nodes_and_keys:
1024
node = nodes[node_index]
1025
for next_sub_key in sub_keys:
1026
if next_sub_key in node.keys:
1027
value, refs = node.keys[next_sub_key]
1028
if self.node_ref_lists:
1029
yield (self, next_sub_key, value, refs)
1031
yield (self, next_sub_key, value)
1033
def iter_entries_prefix(self, keys):
1034
"""Iterate over keys within the index using prefix matching.
1036
Prefix matching is applied within the tuple of a key, not to within
1037
the bytestring of each key element. e.g. if you have the keys ('foo',
1038
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
1039
only the former key is returned.
1041
WARNING: Note that this method currently causes a full index parse
1042
unconditionally (which is reasonably appropriate as it is a means for
1043
thunking many small indices into one larger one and still supplies
1044
iter_all_entries at the thunk layer).
1046
:param keys: An iterable providing the key prefixes to be retrieved.
1047
Each key prefix takes the form of a tuple the length of a key, but
1048
with the last N elements 'None' rather than a regular bytestring.
1049
The first element cannot be 'None'.
1050
:return: An iterable as per iter_all_entries, but restricted to the
1051
keys with a matching prefix to those supplied. No additional keys
1052
will be returned, and every match that is in the index will be
1055
keys = sorted(set(keys))
1058
# Load if needed to check key lengths
1059
if self._key_count is None:
1060
self._get_root_node()
1061
# TODO: only access nodes that can satisfy the prefixes we are looking
1062
# for. For now, to meet API usage (as this function is not used by
1063
# current bzrlib) just suck the entire index and iterate in memory.
1065
if self.node_ref_lists:
1066
if self._key_length == 1:
1067
for _1, key, value, refs in self.iter_all_entries():
1068
nodes[key] = value, refs
1071
for _1, key, value, refs in self.iter_all_entries():
1072
key_value = key, value, refs
1073
# For a key of (foo, bar, baz) create
1074
# _nodes_by_key[foo][bar][baz] = key_value
1075
key_dict = nodes_by_key
1076
for subkey in key[:-1]:
1077
key_dict = key_dict.setdefault(subkey, {})
1078
key_dict[key[-1]] = key_value
1080
if self._key_length == 1:
1081
for _1, key, value in self.iter_all_entries():
1085
for _1, key, value in self.iter_all_entries():
1086
key_value = key, value
1087
# For a key of (foo, bar, baz) create
1088
# _nodes_by_key[foo][bar][baz] = key_value
1089
key_dict = nodes_by_key
1090
for subkey in key[:-1]:
1091
key_dict = key_dict.setdefault(subkey, {})
1092
key_dict[key[-1]] = key_value
1093
if self._key_length == 1:
1097
raise errors.BadIndexKey(key)
1098
if len(key) != self._key_length:
1099
raise errors.BadIndexKey(key)
1101
if self.node_ref_lists:
1102
value, node_refs = nodes[key]
1103
yield self, key, value, node_refs
1105
yield self, key, nodes[key]
1112
raise errors.BadIndexKey(key)
1113
if len(key) != self._key_length:
1114
raise errors.BadIndexKey(key)
1115
# find what it refers to:
1116
key_dict = nodes_by_key
1117
elements = list(key)
1118
# find the subdict whose contents should be returned.
1120
while len(elements) and elements[0] is not None:
1121
key_dict = key_dict[elements[0]]
1124
# a non-existant lookup.
1129
key_dict = dicts.pop(-1)
1130
# can't be empty or would not exist
1131
item, value = key_dict.iteritems().next()
1132
if type(value) == dict:
1134
dicts.extend(key_dict.itervalues())
1137
for value in key_dict.itervalues():
1138
# each value is the key:value:node refs tuple
1140
yield (self, ) + value
1142
# the last thing looked up was a terminal element
1143
yield (self, ) + key_dict
1145
def key_count(self):
1146
"""Return an estimate of the number of keys in this index.
1148
For BTreeGraphIndex the estimate is exact as it is contained in the
1151
if self._key_count is None:
1152
self._get_root_node()
1153
return self._key_count
1155
def _compute_row_offsets(self):
1156
"""Fill out the _row_offsets attribute based on _row_lengths."""
1159
for row in self._row_lengths:
1160
offsets.append(row_offset)
1162
offsets.append(row_offset)
1163
self._row_offsets = offsets
1165
def _parse_header_from_bytes(self, bytes):
1166
"""Parse the header from a region of bytes.
1168
:param bytes: The data to parse.
1169
:return: An offset, data tuple such as readv yields, for the unparsed
1170
data. (which may be of length 0).
1172
signature = bytes[0:len(self._signature())]
1173
if not signature == self._signature():
1174
raise errors.BadIndexFormatSignature(self._name, BTreeGraphIndex)
1175
lines = bytes[len(self._signature()):].splitlines()
1176
options_line = lines[0]
1177
if not options_line.startswith(_OPTION_NODE_REFS):
1178
raise errors.BadIndexOptions(self)
1180
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
1182
raise errors.BadIndexOptions(self)
1183
options_line = lines[1]
1184
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
1185
raise errors.BadIndexOptions(self)
1187
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
1189
raise errors.BadIndexOptions(self)
1190
options_line = lines[2]
1191
if not options_line.startswith(_OPTION_LEN):
1192
raise errors.BadIndexOptions(self)
1194
self._key_count = int(options_line[len(_OPTION_LEN):])
1196
raise errors.BadIndexOptions(self)
1197
options_line = lines[3]
1198
if not options_line.startswith(_OPTION_ROW_LENGTHS):
1199
raise errors.BadIndexOptions(self)
1201
self._row_lengths = map(int, [length for length in
1202
options_line[len(_OPTION_ROW_LENGTHS):].split(',')
1205
raise errors.BadIndexOptions(self)
1206
self._compute_row_offsets()
1208
# calculate the bytes we have processed
1209
header_end = (len(signature) + sum(map(len, lines[0:4])) + 4)
1210
return header_end, bytes[header_end:]
1212
def _read_nodes(self, nodes):
1213
"""Read some nodes from disk into the LRU cache.
1215
This performs a readv to get the node data into memory, and parses each
1216
node, the yields it to the caller. The nodes are requested in the
1217
supplied order. If possible doing sort() on the list before requesting
1218
a read may improve performance.
1220
:param nodes: The nodes to read. 0 - first node, 1 - second node etc.
1225
offset = index * _PAGE_SIZE
1228
# Root node - special case
1230
size = min(_PAGE_SIZE, self._size)
1232
stream = self._transport.get(self._name)
1233
start = stream.read(_PAGE_SIZE)
1234
# Avoid doing this again
1235
self._size = len(start)
1236
size = min(_PAGE_SIZE, self._size)
1238
if offset > self._size:
1239
raise AssertionError('tried to read past the end'
1240
' of the file %s > %s'
1241
% (offset, self._size))
1242
size = min(size, self._size - offset)
1243
ranges.append((offset, size))
1246
if self._file is None:
1247
data_ranges = self._transport.readv(self._name, ranges)
1250
for offset, size in ranges:
1251
self._file.seek(offset)
1252
data_ranges.append((offset, self._file.read(size)))
1253
for offset, data in data_ranges:
1255
# extract the header
1256
offset, data = self._parse_header_from_bytes(data)
1259
bytes = zlib.decompress(data)
1260
if bytes.startswith(_LEAF_FLAG):
1261
node = _LeafNode(bytes, self._key_length, self.node_ref_lists)
1262
elif bytes.startswith(_INTERNAL_FLAG):
1263
node = _InternalNode(bytes)
1265
raise AssertionError("Unknown node type for %r" % bytes)
1266
yield offset / _PAGE_SIZE, node
1268
def _signature(self):
1269
"""The file signature for this index type."""
1273
"""Validate that everything in the index can be accessed."""
1274
# just read and parse every node.
1275
self._get_root_node()
1276
if len(self._row_lengths) > 1:
1277
start_node = self._row_offsets[1]
1279
# We shouldn't be reading anything anyway
1281
node_end = self._row_offsets[-1]
1282
for node in self._read_nodes(range(start_node, node_end)):
1287
from bzrlib import _btree_serializer_c as _btree_serializer
1289
from bzrlib import _btree_serializer_py as _btree_serializer