1399
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1400
"""Read the associated records, and determine what is interesting."""
1401
uninteresting_keys = set(uninteresting_keys)
1402
chks_to_read = uninteresting_keys.union(interesting_keys)
1403
next_uninteresting = set()
1404
next_interesting = set()
1405
next_interesting_intersection = None
1406
uninteresting_items = set()
1407
interesting_items = set()
1408
interesting_to_yield = []
1409
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1410
# records_read.add(record.key())
1413
bytes = record.get_bytes_as('fulltext')
1414
# We don't care about search_key_func for this code, because we only
1415
# care about external references.
1416
node = _deserialise(bytes, record.key, search_key_func=None)
1417
if record.key in uninteresting_keys:
1418
if type(node) is InternalNode:
1419
next_uninteresting.update(node.refs())
1421
# We know we are at a LeafNode, so we can pass None for the
1423
uninteresting_items.update(node.iteritems(None))
1425
interesting_to_yield.append(record.key)
1426
if type(node) is InternalNode:
1427
if next_interesting_intersection is None:
1428
next_interesting_intersection = set(node.refs())
1430
next_interesting_intersection = \
1431
next_interesting_intersection.intersection(node.refs())
1432
next_interesting.update(node.refs())
1434
interesting_items.update(node.iteritems(None))
1435
return (next_uninteresting, uninteresting_items,
1436
next_interesting, interesting_to_yield, interesting_items,
1437
next_interesting_intersection)
1440
def _find_all_uninteresting(store, interesting_root_keys,
1441
uninteresting_root_keys, pb):
1442
"""Determine the full set of uninteresting keys."""
1443
# What about duplicates between interesting_root_keys and
1444
# uninteresting_root_keys?
1445
if not uninteresting_root_keys:
1446
# Shortcut case. We know there is nothing uninteresting to filter out
1447
# So we just let the rest of the algorithm do the work
1448
# We know there is nothing uninteresting, and we didn't have to read
1449
# any interesting records yet.
1450
return (set(), set(), set(interesting_root_keys), [], set())
1451
all_uninteresting_chks = set(uninteresting_root_keys)
1452
all_uninteresting_items = set()
1454
# First step, find the direct children of both the interesting and
1456
(uninteresting_keys, uninteresting_items,
1457
interesting_keys, interesting_to_yield,
1458
interesting_items, interesting_intersection,
1459
) = _find_children_info(store, interesting_root_keys,
1460
uninteresting_root_keys,
1462
all_uninteresting_chks.update(uninteresting_keys)
1463
all_uninteresting_items.update(uninteresting_items)
1464
del uninteresting_items
1465
# Do not examine in detail pages common to all interesting trees.
1466
# Pages that are common to all interesting trees will have their
1467
# older versions found via the uninteresting tree traversal. Some pages
1468
# found via the interesting trees traversal will be uninteresting for
1469
# other of the interesting trees, which is why we require the pages to be
1470
# common for us to trim them.
1471
if interesting_intersection is not None:
1472
uninteresting_keys.difference_update(interesting_intersection)
1474
# Second, find the full set of uninteresting bits reachable by the
1475
# uninteresting roots
1476
chks_to_read = uninteresting_keys
1479
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1480
# TODO: Handle 'absent'
1444
class CHKMapDifference(object):
1445
"""Iterate the stored pages and key,value pairs for (new - old).
1447
This class provides a generator over the stored CHK pages and the
1448
(key, value) pairs that are in any of the new maps and not in any of the
1451
Note that it may yield chk pages that are common (especially root nodes),
1452
but it won't yield (key,value) pairs that are common.
1455
def __init__(self, store, new_root_keys, old_root_keys,
1456
search_key_func, pb=None):
1457
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1458
# force callers to use StaticTuple, because there will often be
1459
# lots of keys passed in here. And even if we cast it locally,
1460
# that just meanst that we will have *both* a StaticTuple and a
1461
# tuple() in memory, referring to the same object. (so a net
1462
# increase in memory, not a decrease.)
1464
self._new_root_keys = new_root_keys
1465
self._old_root_keys = old_root_keys
1467
# All uninteresting chks that we have seen. By the time they are added
1468
# here, they should be either fully ignored, or queued up for
1470
# TODO: This might grow to a large size if there are lots of merge
1471
# parents, etc. However, it probably doesn't scale to O(history)
1472
# like _processed_new_refs does.
1473
self._all_old_chks = set(self._old_root_keys)
1474
# All items that we have seen from the old_root_keys
1475
self._all_old_items = set()
1476
# These are interesting items which were either read, or already in the
1477
# interesting queue (so we don't need to walk them again)
1478
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1480
self._processed_new_refs = set()
1481
self._search_key_func = search_key_func
1483
# The uninteresting and interesting nodes to be searched
1484
self._old_queue = []
1485
self._new_queue = []
1486
# Holds the (key, value) items found when processing the root nodes,
1487
# waiting for the uninteresting nodes to be walked
1488
self._new_item_queue = []
1491
def _read_nodes_from_store(self, keys):
1492
# We chose not to use _page_cache, because we think in terms of records
1493
# to be yielded. Also, we expect to touch each page only 1 time during
1494
# this code. (We may want to evaluate saving the raw bytes into the
1495
# page cache, which would allow a working tree update after the fetch
1496
# to not have to read the bytes again.)
1497
as_st = StaticTuple.from_sequence
1498
stream = self._store.get_record_stream(keys, 'unordered', True)
1499
for record in stream:
1500
if self._pb is not None:
1502
if record.storage_kind == 'absent':
1503
raise errors.NoSuchRevision(self._store, record.key)
1483
1504
bytes = record.get_bytes_as('fulltext')
1484
# We don't care about search_key_func for this code, because we
1485
# only care about external references.
1486
node = _deserialise(bytes, record.key, search_key_func=None)
1505
node = _deserialise(bytes, record.key,
1506
search_key_func=self._search_key_func)
1487
1507
if type(node) is InternalNode:
1488
# uninteresting_prefix_chks.update(node._items.iteritems())
1489
chks = node._items.values()
1490
# TODO: We remove the entries that are already in
1491
# uninteresting_chks ?
1492
next_chks.update(chks)
1493
all_uninteresting_chks.update(chks)
1508
# Note we don't have to do node.refs() because we know that
1509
# there are no children that have been pushed into this node
1510
# Note: Using as_st() here seemed to save 1.2MB, which would
1511
# indicate that we keep 100k prefix_refs around while
1512
# processing. They *should* be shorter lived than that...
1513
# It does cost us ~10s of processing time
1514
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1515
prefix_refs = node._items.items()
1495
all_uninteresting_items.update(node._items.iteritems())
1496
chks_to_read = next_chks
1497
return (all_uninteresting_chks, all_uninteresting_items,
1498
interesting_keys, interesting_to_yield, interesting_items)
1519
# Note: We don't use a StaticTuple here. Profiling showed a
1520
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1521
# But a significant slowdown (15s / 145s, or 10%)
1522
items = node._items.items()
1523
yield record, node, prefix_refs, items
1525
def _read_old_roots(self):
1526
old_chks_to_enqueue = []
1527
all_old_chks = self._all_old_chks
1528
for record, node, prefix_refs, items in \
1529
self._read_nodes_from_store(self._old_root_keys):
1530
# Uninteresting node
1531
prefix_refs = [p_r for p_r in prefix_refs
1532
if p_r[1] not in all_old_chks]
1533
new_refs = [p_r[1] for p_r in prefix_refs]
1534
all_old_chks.update(new_refs)
1535
# TODO: This might be a good time to turn items into StaticTuple
1536
# instances and possibly intern them. However, this does not
1537
# impact 'initial branch' performance, so I'm not worrying
1539
self._all_old_items.update(items)
1540
# Queue up the uninteresting references
1541
# Don't actually put them in the 'to-read' queue until we have
1542
# finished checking the interesting references
1543
old_chks_to_enqueue.extend(prefix_refs)
1544
return old_chks_to_enqueue
1546
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1547
# At this point, we have read all the uninteresting and interesting
1548
# items, so we can queue up the uninteresting stuff, knowing that we've
1549
# handled the interesting ones
1550
for prefix, ref in old_chks_to_enqueue:
1551
not_interesting = True
1552
for i in xrange(len(prefix), 0, -1):
1553
if prefix[:i] in new_prefixes:
1554
not_interesting = False
1557
# This prefix is not part of the remaining 'interesting set'
1559
self._old_queue.append(ref)
1561
def _read_all_roots(self):
1562
"""Read the root pages.
1564
This is structured as a generator, so that the root records can be
1565
yielded up to whoever needs them without any buffering.
1567
# This is the bootstrap phase
1568
if not self._old_root_keys:
1569
# With no old_root_keys we can just shortcut and be ready
1570
# for _flush_new_queue
1571
self._new_queue = list(self._new_root_keys)
1573
old_chks_to_enqueue = self._read_old_roots()
1574
# filter out any root keys that are already known to be uninteresting
1575
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1576
# These are prefixes that are present in new_keys that we are
1578
new_prefixes = set()
1579
# We are about to yield all of these, so we don't want them getting
1580
# added a second time
1581
processed_new_refs = self._processed_new_refs
1582
processed_new_refs.update(new_keys)
1583
for record, node, prefix_refs, items in \
1584
self._read_nodes_from_store(new_keys):
1585
# At this level, we now know all the uninteresting references
1586
# So we filter and queue up whatever is remaining
1587
prefix_refs = [p_r for p_r in prefix_refs
1588
if p_r[1] not in self._all_old_chks
1589
and p_r[1] not in processed_new_refs]
1590
refs = [p_r[1] for p_r in prefix_refs]
1591
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1592
self._new_queue.extend(refs)
1593
# TODO: We can potentially get multiple items here, however the
1594
# current design allows for this, as callers will do the work
1595
# to make the results unique. We might profile whether we
1596
# gain anything by ensuring unique return values for items
1597
# TODO: This might be a good time to cast to StaticTuple, as
1598
# self._new_item_queue will hold the contents of multiple
1599
# records for an extended lifetime
1600
new_items = [item for item in items
1601
if item not in self._all_old_items]
1602
self._new_item_queue.extend(new_items)
1603
new_prefixes.update([self._search_key_func(item[0])
1604
for item in new_items])
1605
processed_new_refs.update(refs)
1607
# For new_prefixes we have the full length prefixes queued up.
1608
# However, we also need possible prefixes. (If we have a known ref to
1609
# 'ab', then we also need to include 'a'.) So expand the
1610
# new_prefixes to include all shorter prefixes
1611
for prefix in list(new_prefixes):
1612
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1613
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1615
def _flush_new_queue(self):
1616
# No need to maintain the heap invariant anymore, just pull things out
1618
refs = set(self._new_queue)
1619
self._new_queue = []
1620
# First pass, flush all interesting items and convert to using direct refs
1621
all_old_chks = self._all_old_chks
1622
processed_new_refs = self._processed_new_refs
1623
all_old_items = self._all_old_items
1624
new_items = [item for item in self._new_item_queue
1625
if item not in all_old_items]
1626
self._new_item_queue = []
1628
yield None, new_items
1629
refs = refs.difference(all_old_chks)
1630
processed_new_refs.update(refs)
1632
# TODO: Using a SimpleSet for self._processed_new_refs and
1633
# saved as much as 10MB of peak memory. However, it requires
1634
# implementing a non-pyrex version.
1636
next_refs_update = next_refs.update
1637
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1638
# from 1m54s to 1m51s. Consider it.
1639
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1641
# using the 'if' check saves about 145s => 141s, when
1642
# streaming initial branch of Launchpad data.
1643
items = [item for item in items
1644
if item not in all_old_items]
1646
next_refs_update([p_r[1] for p_r in p_refs])
1648
# set1.difference(set/dict) walks all of set1, and checks if it
1649
# exists in 'other'.
1650
# set1.difference(iterable) walks all of iterable, and does a
1651
# 'difference_update' on a clone of set1. Pick wisely based on the
1652
# expected sizes of objects.
1653
# in our case it is expected that 'new_refs' will always be quite
1655
next_refs = next_refs.difference(all_old_chks)
1656
next_refs = next_refs.difference(processed_new_refs)
1657
processed_new_refs.update(next_refs)
1660
def _process_next_old(self):
1661
# Since we don't filter uninteresting any further than during
1662
# _read_all_roots, process the whole queue in a single pass.
1663
refs = self._old_queue
1664
self._old_queue = []
1665
all_old_chks = self._all_old_chks
1666
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1667
# TODO: Use StaticTuple here?
1668
self._all_old_items.update(items)
1669
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1670
self._old_queue.extend(refs)
1671
all_old_chks.update(refs)
1673
def _process_queues(self):
1674
while self._old_queue:
1675
self._process_next_old()
1676
return self._flush_new_queue()
1679
for record in self._read_all_roots():
1681
for record, items in self._process_queues():
1501
1685
def iter_interesting_nodes(store, interesting_root_keys,
1513
1697
(interesting record, {interesting key:values})
1515
# TODO: consider that it may be more memory efficient to use the 20-byte
1516
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1517
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1518
# helper function similar to _read_bytes. This function should be
1519
# able to use nodes from the _page_cache as well as actually
1520
# requesting bytes from the store.
1522
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1523
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1524
interesting_root_keys, uninteresting_root_keys, pb)
1526
# Now that we know everything uninteresting, we can yield information from
1528
interesting_items.difference_update(all_uninteresting_items)
1529
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1530
if interesting_items:
1531
yield None, interesting_items
1532
if interesting_to_yield:
1533
# We request these records again, rather than buffering the root
1534
# records, most likely they are still in the _group_cache anyway.
1535
for record in store.get_record_stream(interesting_to_yield,
1536
'unordered', False):
1538
all_uninteresting_chks.update(interesting_to_yield)
1539
interesting_keys.difference_update(all_uninteresting_chks)
1541
chks_to_read = interesting_keys
1545
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1548
pb.update('find chk pages', counter)
1549
# TODO: Handle 'absent'?
1550
bytes = record.get_bytes_as('fulltext')
1551
# We don't care about search_key_func for this code, because we
1552
# only care about external references.
1553
node = _deserialise(bytes, record.key, search_key_func=None)
1554
if type(node) is InternalNode:
1555
# all_uninteresting_chks grows large, as it lists all nodes we
1556
# don't want to process (including already seen interesting
1558
# small.difference_update(large) scales O(large), but
1559
# small.difference(large) scales O(small).
1560
# Also, we know we just _deserialised this node, so we can
1561
# access the dict directly.
1562
chks = set(node._items.itervalues()).difference(
1563
all_uninteresting_chks)
1564
# Is set() and .difference_update better than:
1565
# chks = [chk for chk in node.refs()
1566
# if chk not in all_uninteresting_chks]
1567
next_chks.update(chks)
1568
# These are now uninteresting everywhere else
1569
all_uninteresting_chks.update(chks)
1570
interesting_items = []
1572
interesting_items = [item for item in node._items.iteritems()
1573
if item not in all_uninteresting_items]
1574
# TODO: Do we need to filter out items that we have already
1575
# seen on other pages? We don't really want to buffer the
1576
# whole thing, but it does mean that callers need to
1577
# understand they may get duplicate values.
1578
# all_uninteresting_items.update(interesting_items)
1579
yield record, interesting_items
1580
chks_to_read = next_chks
1699
iterator = CHKMapDifference(store, interesting_root_keys,
1700
uninteresting_root_keys,
1701
search_key_func=store._search_key_func,
1703
return iterator.process()