1401
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1402
"""Read the associated records, and determine what is interesting."""
1403
uninteresting_keys = set(uninteresting_keys)
1404
chks_to_read = uninteresting_keys.union(interesting_keys)
1405
next_uninteresting = set()
1406
next_interesting = set()
1407
next_interesting_intersection = None
1408
uninteresting_items = set()
1409
interesting_items = set()
1410
interesting_to_yield = []
1411
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1412
# records_read.add(record.key())
1415
bytes = record.get_bytes_as('fulltext')
1416
# We don't care about search_key_func for this code, because we only
1417
# care about external references.
1418
node = _deserialise(bytes, record.key, search_key_func=None)
1419
if record.key in uninteresting_keys:
1420
if type(node) is InternalNode:
1421
next_uninteresting.update(node.refs())
1423
# We know we are at a LeafNode, so we can pass None for the
1425
uninteresting_items.update(node.iteritems(None))
1427
interesting_to_yield.append(record.key)
1428
if type(node) is InternalNode:
1429
if next_interesting_intersection is None:
1430
next_interesting_intersection = set(node.refs())
1432
next_interesting_intersection = \
1433
next_interesting_intersection.intersection(node.refs())
1434
next_interesting.update(node.refs())
1436
interesting_items.update(node.iteritems(None))
1437
return (next_uninteresting, uninteresting_items,
1438
next_interesting, interesting_to_yield, interesting_items,
1439
next_interesting_intersection)
1442
def _find_all_uninteresting(store, interesting_root_keys,
1443
uninteresting_root_keys, pb):
1444
"""Determine the full set of uninteresting keys."""
1445
# What about duplicates between interesting_root_keys and
1446
# uninteresting_root_keys?
1447
if not uninteresting_root_keys:
1448
# Shortcut case. We know there is nothing uninteresting to filter out
1449
# So we just let the rest of the algorithm do the work
1450
# We know there is nothing uninteresting, and we didn't have to read
1451
# any interesting records yet.
1452
return (set(), set(), set(interesting_root_keys), [], set())
1453
all_uninteresting_chks = set(uninteresting_root_keys)
1454
all_uninteresting_items = set()
1456
# First step, find the direct children of both the interesting and
1458
(uninteresting_keys, uninteresting_items,
1459
interesting_keys, interesting_to_yield,
1460
interesting_items, interesting_intersection,
1461
) = _find_children_info(store, interesting_root_keys,
1462
uninteresting_root_keys,
1464
all_uninteresting_chks.update(uninteresting_keys)
1465
all_uninteresting_items.update(uninteresting_items)
1466
del uninteresting_items
1467
# Do not examine in detail pages common to all interesting trees.
1468
# Pages that are common to all interesting trees will have their
1469
# older versions found via the uninteresting tree traversal. Some pages
1470
# found via the interesting trees traversal will be uninteresting for
1471
# other of the interesting trees, which is why we require the pages to be
1472
# common for us to trim them.
1473
if interesting_intersection is not None:
1474
uninteresting_keys.difference_update(interesting_intersection)
1476
# Second, find the full set of uninteresting bits reachable by the
1477
# uninteresting roots
1478
chks_to_read = uninteresting_keys
1481
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1482
# TODO: Handle 'absent'
1399
class CHKMapDifference(object):
1400
"""Iterate the stored pages and key,value pairs for (new - old).
1402
This class provides a generator over the stored CHK pages and the
1403
(key, value) pairs that are in any of the new maps and not in any of the
1406
Note that it may yield chk pages that are common (especially root nodes),
1407
but it won't yield (key,value) pairs that are common.
1410
def __init__(self, store, new_root_keys, old_root_keys,
1411
search_key_func, pb=None):
1413
self._new_root_keys = new_root_keys
1414
self._old_root_keys = old_root_keys
1416
# All uninteresting chks that we have seen. By the time they are added
1417
# here, they should be either fully ignored, or queued up for
1419
self._all_old_chks = set(self._old_root_keys)
1420
# All items that we have seen from the old_root_keys
1421
self._all_old_items = set()
1422
# These are interesting items which were either read, or already in the
1423
# interesting queue (so we don't need to walk them again)
1424
self._processed_new_refs = set()
1425
self._search_key_func = search_key_func
1427
# The uninteresting and interesting nodes to be searched
1428
self._old_queue = []
1429
self._new_queue = []
1430
# Holds the (key, value) items found when processing the root nodes,
1431
# waiting for the uninteresting nodes to be walked
1432
self._new_item_queue = []
1435
def _read_nodes_from_store(self, keys):
1436
# We chose not to use _page_cache, because we think in terms of records
1437
# to be yielded. Also, we expect to touch each page only 1 time during
1438
# this code. (We may want to evaluate saving the raw bytes into the
1439
# page cache, which would allow a working tree update after the fetch
1440
# to not have to read the bytes again.)
1441
stream = self._store.get_record_stream(keys, 'unordered', True)
1442
for record in stream:
1443
if self._pb is not None:
1445
if record.storage_kind == 'absent':
1446
raise errors.NoSuchRevision(self._store, record.key)
1485
1447
bytes = record.get_bytes_as('fulltext')
1486
# We don't care about search_key_func for this code, because we
1487
# only care about external references.
1488
node = _deserialise(bytes, record.key, search_key_func=None)
1448
node = _deserialise(bytes, record.key,
1449
search_key_func=self._search_key_func)
1489
1450
if type(node) is InternalNode:
1490
# uninteresting_prefix_chks.update(node._items.iteritems())
1491
chks = node._items.values()
1492
# TODO: We remove the entries that are already in
1493
# uninteresting_chks ?
1494
next_chks.update(chks)
1495
all_uninteresting_chks.update(chks)
1451
# Note we don't have to do node.refs() because we know that
1452
# there are no children that have been pushed into this node
1453
prefix_refs = node._items.items()
1497
all_uninteresting_items.update(node._items.iteritems())
1498
chks_to_read = next_chks
1499
return (all_uninteresting_chks, all_uninteresting_items,
1500
interesting_keys, interesting_to_yield, interesting_items)
1457
items = node._items.items()
1458
yield record, node, prefix_refs, items
1460
def _read_old_roots(self):
1461
old_chks_to_enqueue = []
1462
all_old_chks = self._all_old_chks
1463
for record, node, prefix_refs, items in \
1464
self._read_nodes_from_store(self._old_root_keys):
1465
# Uninteresting node
1466
prefix_refs = [p_r for p_r in prefix_refs
1467
if p_r[1] not in all_old_chks]
1468
new_refs = [p_r[1] for p_r in prefix_refs]
1469
all_old_chks.update(new_refs)
1470
self._all_old_items.update(items)
1471
# Queue up the uninteresting references
1472
# Don't actually put them in the 'to-read' queue until we have
1473
# finished checking the interesting references
1474
old_chks_to_enqueue.extend(prefix_refs)
1475
return old_chks_to_enqueue
1477
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1478
# At this point, we have read all the uninteresting and interesting
1479
# items, so we can queue up the uninteresting stuff, knowing that we've
1480
# handled the interesting ones
1481
for prefix, ref in old_chks_to_enqueue:
1482
not_interesting = True
1483
for i in xrange(len(prefix), 0, -1):
1484
if prefix[:i] in new_prefixes:
1485
not_interesting = False
1488
# This prefix is not part of the remaining 'interesting set'
1490
self._old_queue.append(ref)
1492
def _read_all_roots(self):
1493
"""Read the root pages.
1495
This is structured as a generator, so that the root records can be
1496
yielded up to whoever needs them without any buffering.
1498
# This is the bootstrap phase
1499
if not self._old_root_keys:
1500
# With no old_root_keys we can just shortcut and be ready
1501
# for _flush_new_queue
1502
self._new_queue = list(self._new_root_keys)
1504
old_chks_to_enqueue = self._read_old_roots()
1505
# filter out any root keys that are already known to be uninteresting
1506
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1507
# These are prefixes that are present in new_keys that we are
1509
new_prefixes = set()
1510
# We are about to yield all of these, so we don't want them getting
1511
# added a second time
1512
processed_new_refs = self._processed_new_refs
1513
processed_new_refs.update(new_keys)
1514
for record, node, prefix_refs, items in \
1515
self._read_nodes_from_store(new_keys):
1516
# At this level, we now know all the uninteresting references
1517
# So we filter and queue up whatever is remaining
1518
prefix_refs = [p_r for p_r in prefix_refs
1519
if p_r[1] not in self._all_old_chks
1520
and p_r[1] not in processed_new_refs]
1521
refs = [p_r[1] for p_r in prefix_refs]
1522
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1523
self._new_queue.extend(refs)
1524
# TODO: We can potentially get multiple items here, however the
1525
# current design allows for this, as callers will do the work
1526
# to make the results unique. We might profile whether we
1527
# gain anything by ensuring unique return values for items
1528
new_items = [item for item in items
1529
if item not in self._all_old_items]
1530
self._new_item_queue.extend(new_items)
1531
new_prefixes.update([self._search_key_func(item[0])
1532
for item in new_items])
1533
processed_new_refs.update(refs)
1535
# For new_prefixes we have the full length prefixes queued up.
1536
# However, we also need possible prefixes. (If we have a known ref to
1537
# 'ab', then we also need to include 'a'.) So expand the
1538
# new_prefixes to include all shorter prefixes
1539
for prefix in list(new_prefixes):
1540
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1541
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1543
def _flush_new_queue(self):
1544
# No need to maintain the heap invariant anymore, just pull things out
1546
refs = set(self._new_queue)
1547
self._new_queue = []
1548
# First pass, flush all interesting items and convert to using direct refs
1549
all_old_chks = self._all_old_chks
1550
processed_new_refs = self._processed_new_refs
1551
all_old_items = self._all_old_items
1552
new_items = [item for item in self._new_item_queue
1553
if item not in all_old_items]
1554
self._new_item_queue = []
1556
yield None, new_items
1557
refs = refs.difference(all_old_chks)
1560
next_refs_update = next_refs.update
1561
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1562
# from 1m54s to 1m51s. Consider it.
1563
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1564
items = [item for item in items
1565
if item not in all_old_items]
1567
next_refs_update([p_r[1] for p_r in p_refs])
1568
next_refs = next_refs.difference(all_old_chks)
1569
next_refs = next_refs.difference(processed_new_refs)
1570
processed_new_refs.update(next_refs)
1573
def _process_next_old(self):
1574
# Since we don't filter uninteresting any further than during
1575
# _read_all_roots, process the whole queue in a single pass.
1576
refs = self._old_queue
1577
self._old_queue = []
1578
all_old_chks = self._all_old_chks
1579
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1580
self._all_old_items.update(items)
1581
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1582
self._old_queue.extend(refs)
1583
all_old_chks.update(refs)
1585
def _process_queues(self):
1586
while self._old_queue:
1587
self._process_next_old()
1588
return self._flush_new_queue()
1591
for record in self._read_all_roots():
1593
for record, items in self._process_queues():
1503
1597
def iter_interesting_nodes(store, interesting_root_keys,
1515
1609
(interesting record, {interesting key:values})
1517
# TODO: consider that it may be more memory efficient to use the 20-byte
1518
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1519
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1520
# helper function similar to _read_bytes. This function should be
1521
# able to use nodes from the _page_cache as well as actually
1522
# requesting bytes from the store.
1524
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1525
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1526
interesting_root_keys, uninteresting_root_keys, pb)
1528
# Now that we know everything uninteresting, we can yield information from
1530
interesting_items.difference_update(all_uninteresting_items)
1531
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1532
if interesting_items:
1533
yield None, interesting_items
1534
if interesting_to_yield:
1535
# We request these records again, rather than buffering the root
1536
# records, most likely they are still in the _group_cache anyway.
1537
for record in store.get_record_stream(interesting_to_yield,
1538
'unordered', False):
1540
all_uninteresting_chks.update(interesting_to_yield)
1541
interesting_keys.difference_update(all_uninteresting_chks)
1543
chks_to_read = interesting_keys
1547
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1550
pb.update('find chk pages', counter)
1551
# TODO: Handle 'absent'?
1552
bytes = record.get_bytes_as('fulltext')
1553
# We don't care about search_key_func for this code, because we
1554
# only care about external references.
1555
node = _deserialise(bytes, record.key, search_key_func=None)
1556
if type(node) is InternalNode:
1557
# all_uninteresting_chks grows large, as it lists all nodes we
1558
# don't want to process (including already seen interesting
1560
# small.difference_update(large) scales O(large), but
1561
# small.difference(large) scales O(small).
1562
# Also, we know we just _deserialised this node, so we can
1563
# access the dict directly.
1564
chks = set(node._items.itervalues()).difference(
1565
all_uninteresting_chks)
1566
# Is set() and .difference_update better than:
1567
# chks = [chk for chk in node.refs()
1568
# if chk not in all_uninteresting_chks]
1569
next_chks.update(chks)
1570
# These are now uninteresting everywhere else
1571
all_uninteresting_chks.update(chks)
1572
interesting_items = []
1574
interesting_items = [item for item in node._items.iteritems()
1575
if item not in all_uninteresting_items]
1576
# TODO: Do we need to filter out items that we have already
1577
# seen on other pages? We don't really want to buffer the
1578
# whole thing, but it does mean that callers need to
1579
# understand they may get duplicate values.
1580
# all_uninteresting_items.update(interesting_items)
1581
yield record, interesting_items
1582
chks_to_read = next_chks
1611
iterator = CHKMapDifference(store, interesting_root_keys,
1612
uninteresting_root_keys,
1613
search_key_func=store._search_key_func,
1615
return iterator.process()