204
213
:return: The root chk of the resulting CHKMap.
206
result = CHKMap(store, None, search_key_func=search_key_func)
215
root_key = klass._create_directly(store, initial_value,
216
maximum_size=maximum_size, key_width=key_width,
217
search_key_func=search_key_func)
221
def _create_via_map(klass, store, initial_value, maximum_size=0,
222
key_width=1, search_key_func=None):
223
result = klass(store, None, search_key_func=search_key_func)
207
224
result._root_node.set_maximum_size(maximum_size)
208
225
result._root_node._key_width = key_width
210
227
for key, value in initial_value.items():
211
228
delta.append((None, key, value))
212
return result.apply_delta(delta)
229
root_key = result.apply_delta(delta)
233
def _create_directly(klass, store, initial_value, maximum_size=0,
234
key_width=1, search_key_func=None):
235
node = LeafNode(search_key_func=search_key_func)
236
node.set_maximum_size(maximum_size)
237
node._key_width = key_width
238
node._items = dict(initial_value)
239
node._raw_size = sum([node._key_value_len(key, value)
240
for key,value in initial_value.iteritems()])
241
node._len = len(node._items)
242
node._compute_search_prefix()
243
node._compute_serialised_prefix()
246
and node._current_size() > maximum_size):
247
prefix, node_details = node._split(store)
248
if len(node_details) == 1:
249
raise AssertionError('Failed to split using node._split')
250
node = InternalNode(prefix, search_key_func=search_key_func)
251
node.set_maximum_size(maximum_size)
252
node._key_width = key_width
253
for split, subnode in node_details:
254
node.add_node(split, subnode)
255
keys = list(node.serialise(store))
214
258
def iter_changes(self, basis):
215
259
"""Iterate over the changes between basis and self.
955
1015
# prefix is the key in self._items to use, key_filter is the key_filter
956
1016
# entries that would match this node
958
1019
if key_filter is None:
1020
# yielding all nodes, yield whatever we have, and queue up a read
1021
# for whatever we are missing
959
1023
for prefix, node in self._items.iteritems():
960
if type(node) == tuple:
1024
if node.__class__ is tuple:
961
1025
keys[node] = (prefix, None)
963
1027
yield node, None
1028
elif len(key_filter) == 1:
1029
# Technically, this path could also be handled by the first check
1030
# in 'self._node_width' in length_filters. However, we can handle
1031
# this case without spending any time building up the
1032
# prefix_to_keys, etc state.
1034
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1035
# 0.626us list(key_filter)[0]
1036
# is a func() for list(), 2 mallocs, and a getitem
1037
# 0.489us [k for k in key_filter][0]
1038
# still has the mallocs, avoids the func() call
1039
# 0.350us iter(key_filter).next()
1040
# has a func() call, and mallocs an iterator
1041
# 0.125us for key in key_filter: pass
1042
# no func() overhead, might malloc an iterator
1043
# 0.105us for key in key_filter: break
1044
# no func() overhead, might malloc an iterator, probably
1045
# avoids checking an 'else' clause as part of the for
1046
for key in key_filter:
1048
search_prefix = self._search_prefix_filter(key)
1049
if len(search_prefix) == self._node_width:
1050
# This item will match exactly, so just do a dict lookup, and
1051
# see what we can return
1054
node = self._items[search_prefix]
1056
# A given key can only match 1 child node, if it isn't
1057
# there, then we can just return nothing
1059
if node.__class__ is tuple:
1060
keys[node] = (search_prefix, [key])
1062
# This is loaded, and the only thing that can match,
1067
# First, convert all keys into a list of search prefixes
1068
# Aggregate common prefixes, and track the keys they come from
966
1069
prefix_to_keys = {}
967
1070
length_filters = {}
968
1071
for key in key_filter:
969
search_key = self._search_prefix_filter(key)
1072
search_prefix = self._search_prefix_filter(key)
970
1073
length_filter = length_filters.setdefault(
971
len(search_key), set())
972
length_filter.add(search_key)
973
prefix_to_keys.setdefault(search_key, []).append(key)
974
length_filters = length_filters.items()
975
for prefix, node in self._items.iteritems():
977
for length, length_filter in length_filters:
978
sub_prefix = prefix[:length]
979
if sub_prefix in length_filter:
980
node_key_filter.extend(prefix_to_keys[sub_prefix])
981
if node_key_filter: # this key matched something, yield it
982
if type(node) == tuple:
983
keys[node] = (prefix, node_key_filter)
1074
len(search_prefix), set())
1075
length_filter.add(search_prefix)
1076
prefix_to_keys.setdefault(search_prefix, []).append(key)
1078
if (self._node_width in length_filters
1079
and len(length_filters) == 1):
1080
# all of the search prefixes match exactly _node_width. This
1081
# means that everything is an exact match, and we can do a
1082
# lookup into self._items, rather than iterating over the items
1084
search_prefixes = length_filters[self._node_width]
1085
for search_prefix in search_prefixes:
1087
node = self._items[search_prefix]
1089
# We can ignore this one
1091
node_key_filter = prefix_to_keys[search_prefix]
1092
if node.__class__ is tuple:
1093
keys[node] = (search_prefix, node_key_filter)
985
1095
yield node, node_key_filter
1097
# The slow way. We walk every item in self._items, and check to
1098
# see if there are any matches
1099
length_filters = length_filters.items()
1100
for prefix, node in self._items.iteritems():
1101
node_key_filter = []
1102
for length, length_filter in length_filters:
1103
sub_prefix = prefix[:length]
1104
if sub_prefix in length_filter:
1105
node_key_filter.extend(prefix_to_keys[sub_prefix])
1106
if node_key_filter: # this key matched something, yield it
1107
if node.__class__ is tuple:
1108
keys[node] = (prefix, node_key_filter)
1110
yield node, node_key_filter
987
1112
# Look in the page cache for some more bytes
988
1113
found_keys = set()
1289
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1290
"""Read the associated records, and determine what is interesting."""
1291
uninteresting_keys = set(uninteresting_keys)
1292
chks_to_read = uninteresting_keys.union(interesting_keys)
1293
next_uninteresting = set()
1294
next_interesting = set()
1295
uninteresting_items = set()
1296
interesting_items = set()
1297
interesting_to_yield = []
1298
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1299
# records_read.add(record.key())
1302
bytes = record.get_bytes_as('fulltext')
1303
# We don't care about search_key_func for this code, because we only
1304
# care about external references.
1305
node = _deserialise(bytes, record.key, search_key_func=None)
1306
if record.key in uninteresting_keys:
1307
if type(node) is InternalNode:
1308
next_uninteresting.update(node.refs())
1310
# We know we are at a LeafNode, so we can pass None for the
1312
uninteresting_items.update(node.iteritems(None))
1314
interesting_to_yield.append(record.key)
1315
if type(node) is InternalNode:
1316
next_interesting.update(node.refs())
1318
interesting_items.update(node.iteritems(None))
1319
return (next_uninteresting, uninteresting_items,
1320
next_interesting, interesting_to_yield, interesting_items)
1323
def _find_all_uninteresting(store, interesting_root_keys,
1324
uninteresting_root_keys, pb):
1325
"""Determine the full set of uninteresting keys."""
1326
# What about duplicates between interesting_root_keys and
1327
# uninteresting_root_keys?
1328
if not uninteresting_root_keys:
1329
# Shortcut case. We know there is nothing uninteresting to filter out
1330
# So we just let the rest of the algorithm do the work
1331
# We know there is nothing uninteresting, and we didn't have to read
1332
# any interesting records yet.
1333
return (set(), set(), set(interesting_root_keys), [], set())
1334
all_uninteresting_chks = set(uninteresting_root_keys)
1335
all_uninteresting_items = set()
1337
# First step, find the direct children of both the interesting and
1339
(uninteresting_keys, uninteresting_items,
1340
interesting_keys, interesting_to_yield,
1341
interesting_items) = _find_children_info(store, interesting_root_keys,
1342
uninteresting_root_keys,
1344
all_uninteresting_chks.update(uninteresting_keys)
1345
all_uninteresting_items.update(uninteresting_items)
1346
del uninteresting_items
1347
# Note: Exact matches between interesting and uninteresting do not need
1348
# to be search further. Non-exact matches need to be searched in case
1349
# there is a future exact-match
1350
uninteresting_keys.difference_update(interesting_keys)
1352
# Second, find the full set of uninteresting bits reachable by the
1353
# uninteresting roots
1354
chks_to_read = uninteresting_keys
1357
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1358
# TODO: Handle 'absent'
1414
class CHKMapDifference(object):
1415
"""Iterate the stored pages and key,value pairs for (new - old).
1417
This class provides a generator over the stored CHK pages and the
1418
(key, value) pairs that are in any of the new maps and not in any of the
1421
Note that it may yield chk pages that are common (especially root nodes),
1422
but it won't yield (key,value) pairs that are common.
1425
def __init__(self, store, new_root_keys, old_root_keys,
1426
search_key_func, pb=None):
1428
self._new_root_keys = new_root_keys
1429
self._old_root_keys = old_root_keys
1431
# All uninteresting chks that we have seen. By the time they are added
1432
# here, they should be either fully ignored, or queued up for
1434
self._all_old_chks = set(self._old_root_keys)
1435
# All items that we have seen from the old_root_keys
1436
self._all_old_items = set()
1437
# These are interesting items which were either read, or already in the
1438
# interesting queue (so we don't need to walk them again)
1439
self._processed_new_refs = set()
1440
self._search_key_func = search_key_func
1442
# The uninteresting and interesting nodes to be searched
1443
self._old_queue = []
1444
self._new_queue = []
1445
# Holds the (key, value) items found when processing the root nodes,
1446
# waiting for the uninteresting nodes to be walked
1447
self._new_item_queue = []
1450
def _read_nodes_from_store(self, keys):
1451
# We chose not to use _page_cache, because we think in terms of records
1452
# to be yielded. Also, we expect to touch each page only 1 time during
1453
# this code. (We may want to evaluate saving the raw bytes into the
1454
# page cache, which would allow a working tree update after the fetch
1455
# to not have to read the bytes again.)
1456
stream = self._store.get_record_stream(keys, 'unordered', True)
1457
for record in stream:
1458
if self._pb is not None:
1460
if record.storage_kind == 'absent':
1461
raise errors.NoSuchRevision(self._store, record.key)
1361
1462
bytes = record.get_bytes_as('fulltext')
1362
# We don't care about search_key_func for this code, because we
1363
# only care about external references.
1364
node = _deserialise(bytes, record.key, search_key_func=None)
1463
node = _deserialise(bytes, record.key,
1464
search_key_func=self._search_key_func)
1365
1465
if type(node) is InternalNode:
1366
# uninteresting_prefix_chks.update(node._items.iteritems())
1367
chks = node._items.values()
1368
# TODO: We remove the entries that are already in
1369
# uninteresting_chks ?
1370
next_chks.update(chks)
1371
all_uninteresting_chks.update(chks)
1466
# Note we don't have to do node.refs() because we know that
1467
# there are no children that have been pushed into this node
1468
prefix_refs = node._items.items()
1373
all_uninteresting_items.update(node._items.iteritems())
1374
chks_to_read = next_chks
1375
return (all_uninteresting_chks, all_uninteresting_items,
1376
interesting_keys, interesting_to_yield, interesting_items)
1472
items = node._items.items()
1473
yield record, node, prefix_refs, items
1475
def _read_old_roots(self):
1476
old_chks_to_enqueue = []
1477
all_old_chks = self._all_old_chks
1478
for record, node, prefix_refs, items in \
1479
self._read_nodes_from_store(self._old_root_keys):
1480
# Uninteresting node
1481
prefix_refs = [p_r for p_r in prefix_refs
1482
if p_r[1] not in all_old_chks]
1483
new_refs = [p_r[1] for p_r in prefix_refs]
1484
all_old_chks.update(new_refs)
1485
self._all_old_items.update(items)
1486
# Queue up the uninteresting references
1487
# Don't actually put them in the 'to-read' queue until we have
1488
# finished checking the interesting references
1489
old_chks_to_enqueue.extend(prefix_refs)
1490
return old_chks_to_enqueue
1492
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1493
# At this point, we have read all the uninteresting and interesting
1494
# items, so we can queue up the uninteresting stuff, knowing that we've
1495
# handled the interesting ones
1496
for prefix, ref in old_chks_to_enqueue:
1497
not_interesting = True
1498
for i in xrange(len(prefix), 0, -1):
1499
if prefix[:i] in new_prefixes:
1500
not_interesting = False
1503
# This prefix is not part of the remaining 'interesting set'
1505
self._old_queue.append(ref)
1507
def _read_all_roots(self):
1508
"""Read the root pages.
1510
This is structured as a generator, so that the root records can be
1511
yielded up to whoever needs them without any buffering.
1513
# This is the bootstrap phase
1514
if not self._old_root_keys:
1515
# With no old_root_keys we can just shortcut and be ready
1516
# for _flush_new_queue
1517
self._new_queue = list(self._new_root_keys)
1519
old_chks_to_enqueue = self._read_old_roots()
1520
# filter out any root keys that are already known to be uninteresting
1521
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1522
# These are prefixes that are present in new_keys that we are
1524
new_prefixes = set()
1525
# We are about to yield all of these, so we don't want them getting
1526
# added a second time
1527
processed_new_refs = self._processed_new_refs
1528
processed_new_refs.update(new_keys)
1529
for record, node, prefix_refs, items in \
1530
self._read_nodes_from_store(new_keys):
1531
# At this level, we now know all the uninteresting references
1532
# So we filter and queue up whatever is remaining
1533
prefix_refs = [p_r for p_r in prefix_refs
1534
if p_r[1] not in self._all_old_chks
1535
and p_r[1] not in processed_new_refs]
1536
refs = [p_r[1] for p_r in prefix_refs]
1537
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1538
self._new_queue.extend(refs)
1539
# TODO: We can potentially get multiple items here, however the
1540
# current design allows for this, as callers will do the work
1541
# to make the results unique. We might profile whether we
1542
# gain anything by ensuring unique return values for items
1543
new_items = [item for item in items
1544
if item not in self._all_old_items]
1545
self._new_item_queue.extend(new_items)
1546
new_prefixes.update([self._search_key_func(item[0])
1547
for item in new_items])
1548
processed_new_refs.update(refs)
1550
# For new_prefixes we have the full length prefixes queued up.
1551
# However, we also need possible prefixes. (If we have a known ref to
1552
# 'ab', then we also need to include 'a'.) So expand the
1553
# new_prefixes to include all shorter prefixes
1554
for prefix in list(new_prefixes):
1555
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1556
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1558
def _flush_new_queue(self):
1559
# No need to maintain the heap invariant anymore, just pull things out
1561
refs = set(self._new_queue)
1562
self._new_queue = []
1563
# First pass, flush all interesting items and convert to using direct refs
1564
all_old_chks = self._all_old_chks
1565
processed_new_refs = self._processed_new_refs
1566
all_old_items = self._all_old_items
1567
new_items = [item for item in self._new_item_queue
1568
if item not in all_old_items]
1569
self._new_item_queue = []
1571
yield None, new_items
1572
refs = refs.difference(all_old_chks)
1575
next_refs_update = next_refs.update
1576
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1577
# from 1m54s to 1m51s. Consider it.
1578
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1579
items = [item for item in items
1580
if item not in all_old_items]
1582
next_refs_update([p_r[1] for p_r in p_refs])
1583
next_refs = next_refs.difference(all_old_chks)
1584
next_refs = next_refs.difference(processed_new_refs)
1585
processed_new_refs.update(next_refs)
1588
def _process_next_old(self):
1589
# Since we don't filter uninteresting any further than during
1590
# _read_all_roots, process the whole queue in a single pass.
1591
refs = self._old_queue
1592
self._old_queue = []
1593
all_old_chks = self._all_old_chks
1594
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1595
self._all_old_items.update(items)
1596
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1597
self._old_queue.extend(refs)
1598
all_old_chks.update(refs)
1600
def _process_queues(self):
1601
while self._old_queue:
1602
self._process_next_old()
1603
return self._flush_new_queue()
1606
for record in self._read_all_roots():
1608
for record, items in self._process_queues():
1379
1612
def iter_interesting_nodes(store, interesting_root_keys,
1391
1624
(interesting record, {interesting key:values})
1393
# TODO: consider that it may be more memory efficient to use the 20-byte
1394
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1395
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1396
# helper function similar to _read_bytes. This function should be
1397
# able to use nodes from the _page_cache as well as actually
1398
# requesting bytes from the store.
1400
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1401
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1402
interesting_root_keys, uninteresting_root_keys, pb)
1404
# Now that we know everything uninteresting, we can yield information from
1406
interesting_items.difference_update(all_uninteresting_items)
1407
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1408
if interesting_items:
1409
yield None, interesting_items
1410
if interesting_to_yield:
1411
# We request these records again, rather than buffering the root
1412
# records, most likely they are still in the _group_cache anyway.
1413
for record in store.get_record_stream(interesting_to_yield,
1414
'unordered', False):
1416
all_uninteresting_chks.update(interesting_to_yield)
1417
interesting_keys.difference_update(all_uninteresting_chks)
1419
chks_to_read = interesting_keys
1423
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1426
pb.update('find chk pages', counter)
1427
# TODO: Handle 'absent'?
1428
bytes = record.get_bytes_as('fulltext')
1429
# We don't care about search_key_func for this code, because we
1430
# only care about external references.
1431
node = _deserialise(bytes, record.key, search_key_func=None)
1432
if type(node) is InternalNode:
1433
# all_uninteresting_chks grows large, as it lists all nodes we
1434
# don't want to process (including already seen interesting
1436
# small.difference_update(large) scales O(large), but
1437
# small.difference(large) scales O(small).
1438
# Also, we know we just _deserialised this node, so we can
1439
# access the dict directly.
1440
chks = set(node._items.itervalues()).difference(
1441
all_uninteresting_chks)
1442
# Is set() and .difference_update better than:
1443
# chks = [chk for chk in node.refs()
1444
# if chk not in all_uninteresting_chks]
1445
next_chks.update(chks)
1446
# These are now uninteresting everywhere else
1447
all_uninteresting_chks.update(chks)
1448
interesting_items = []
1450
interesting_items = [item for item in node._items.iteritems()
1451
if item not in all_uninteresting_items]
1452
# TODO: Do we need to filter out items that we have already
1453
# seen on other pages? We don't really want to buffer the
1454
# whole thing, but it does mean that callers need to
1455
# understand they may get duplicate values.
1456
# all_uninteresting_items.update(interesting_items)
1457
yield record, interesting_items
1458
chks_to_read = next_chks
1626
iterator = CHKMapDifference(store, interesting_root_keys,
1627
uninteresting_root_keys,
1628
search_key_func=store._search_key_func,
1630
return iterator.process()