1411
1411
self._interesting_root_keys = interesting_root_keys
1412
1412
self._uninteresting_root_keys = uninteresting_root_keys
1414
# Note: It would be possible to be smarter about
1415
# all_uninteresting_chks. Instead of having one giant set, we
1416
# could have sets based on the possible prefixes that could reach
1417
# that key. However, there isn't much to be gained by splitting
1414
# All uninteresting chks that we have seen. By the time they are added
1415
# here, they should be either fully ignored, or queued up for
1419
1417
self._all_uninteresting_chks = set(self._uninteresting_root_keys)
1418
# All items that we have seen from the uninteresting_root_keys
1420
1419
self._all_uninteresting_items = set()
1421
1420
# These are interesting items which were either read, or already in the
1422
# interesting queue (so don't add these refs again)
1421
# interesting queue (so we don't need to walk them again)
1423
1422
self._processed_interesting_refs = set()
1424
1423
self._search_key_func = search_key_func
1432
1431
self._state = None
1434
1433
def _read_nodes_from_store(self, keys):
1435
# TODO: make use of _page_cache, or firmly decide not to because we
1436
# want to use 'record' objects.
1434
# We chose not to use _page_cache, because we think in terms of records
1435
# to be yielded. Also, we expect to touch each page only 1 time during
1436
# this code. (We may want to evaluate saving the raw bytes into the
1437
# page cache, which would allow a working tree update after the fetch
1438
# to not have to read the bytes again.)
1437
1439
stream = self._store.get_record_stream(keys, 'unordered', True)
1438
1440
for record in stream:
1439
1441
if self._pb is not None:
1453
1455
items = node._items.items()
1454
1456
yield record, node, prefix_refs, items
1456
def _read_all_roots(self):
1457
"""Read the root pages.
1459
This is structured as a generator, so that the root records can be
1460
yielded up to whoever needs them without any buffering.
1462
# This is the bootstrap phase
1463
if not self._uninteresting_root_keys:
1464
# TODO: when there are no _uninteresting_root_keys we can shortcut
1466
self._interesting_queue = list(self._interesting_root_keys)
1468
# Read the uninteresting nodes first, we would like to read them
1469
# simultaneously, but that requires buffering the interesting nodes
1470
# until all uninteresting ones have been read
1458
def _read_uninteresting_roots(self):
1471
1459
uninteresting_chks_to_enqueue = []
1472
1460
all_uninteresting_chks = self._all_uninteresting_chks
1473
processed_interesting_refs = self._processed_interesting_refs
1474
1461
for record, node, prefix_refs, items in \
1475
1462
self._read_nodes_from_store(self._uninteresting_root_keys):
1476
1463
# Uninteresting node
1483
1470
# Don't actually put them in the 'to-read' queue until we have
1484
1471
# finished checking the interesting references
1485
1472
uninteresting_chks_to_enqueue.extend(prefix_refs)
1473
return uninteresting_chks_to_enqueue
1475
def _enqueue_uninteresting(self, interesting_prefixes,
1476
uninteresting_chks_to_enqueue):
1477
# At this point, we have read all the uninteresting and interesting
1478
# items, so we can queue up the uninteresting stuff, knowing that we've
1479
# handled the interesting ones
1480
for prefix, ref in uninteresting_chks_to_enqueue:
1481
not_interesting = True
1482
for i in xrange(len(prefix), 0, -1):
1483
if prefix[:i] in interesting_prefixes:
1484
not_interesting = False
1487
# This prefix is not part of the remaining 'interesting set'
1489
self._uninteresting_queue.append(ref)
1491
def _read_all_roots(self):
1492
"""Read the root pages.
1494
This is structured as a generator, so that the root records can be
1495
yielded up to whoever needs them without any buffering.
1497
# This is the bootstrap phase
1498
if not self._uninteresting_root_keys:
1499
# With no uninteresting_root_keys we can just shortcut and be ready
1500
# for _flush_interesting_queue
1501
self._interesting_queue = list(self._interesting_root_keys)
1503
uninteresting_chks_to_enqueue = self._read_uninteresting_roots()
1486
1504
# filter out any root keys that are already known to be uninteresting
1487
1505
interesting_keys = set(self._interesting_root_keys).difference(
1488
all_uninteresting_chks)
1489
# These references are common to *all* interesting nodes, and thus we
1490
# know that we have perfect overlap with uninteresting, without queue
1506
self._all_uninteresting_chks)
1507
# These are prefixes that are present in interesting_keys that we are
1492
1509
interesting_prefixes = set()
1493
1510
# We are about to yield all of these, so we don't want them getting
1494
1511
# added a second time
1512
processed_interesting_refs = self._processed_interesting_refs
1495
1513
processed_interesting_refs.update(interesting_keys)
1496
1514
for record, node, prefix_refs, items in \
1497
1515
self._read_nodes_from_store(interesting_keys):
1498
1516
# At this level, we now know all the uninteresting references
1499
1517
# So we filter and queue up whatever is remaining
1500
1518
prefix_refs = [p_r for p_r in prefix_refs
1501
if p_r[1] not in all_uninteresting_chks
1519
if p_r[1] not in self._all_uninteresting_chks
1502
1520
and p_r[1] not in processed_interesting_refs]
1503
1521
refs = [p_r[1] for p_r in prefix_refs]
1504
1522
interesting_prefixes.update([p_r[0] for p_r in prefix_refs])
1521
1539
for prefix in list(interesting_prefixes):
1522
1540
interesting_prefixes.update([prefix[:i]
1523
1541
for i in xrange(1, len(prefix))])
1525
# At this point, we have read all the uninteresting and interesting
1526
# items, so we can queue up the uninteresting stuff, knowing that we've
1527
# handled the interesting ones
1528
for prefix, ref in uninteresting_chks_to_enqueue:
1529
not_interesting = True
1530
for i in xrange(len(prefix), 0, -1):
1531
if prefix[:i] in interesting_prefixes:
1532
not_interesting = False
1535
# This prefix is not part of the remaining 'interesting set'
1537
self._uninteresting_queue.append(ref)
1542
self._enqueue_uninteresting(interesting_prefixes,
1543
uninteresting_chks_to_enqueue)
1539
1545
def _flush_interesting_queue(self):
1540
1546
# No need to maintain the heap invariant anymore, just pull things out
1555
1562
next_refs_update = next_refs.update
1556
1563
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1557
1564
# from 1m54s to 1m51s. Consider it.
1558
for record, _, prefix_refs, items in \
1559
self._read_nodes_from_store(refs):
1565
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1560
1566
items = [item for item in items
1561
1567
if item not in all_uninteresting_items]
1562
1568
yield record, items
1563
next_refs_update([p_r[1] for p_r in prefix_refs])
1569
next_refs_update([p_r[1] for p_r in p_refs])
1564
1570
next_refs = next_refs.difference(all_uninteresting_chks)
1565
1571
next_refs = next_refs.difference(processed_interesting_refs)
1566
1572
processed_interesting_refs.update(next_refs)
1567
1573
refs = next_refs
1569
1575
def _process_next_uninteresting(self):
1570
# TODO: We really should be filtering uninteresting requests a bit more
1571
# Specifically, past the root level, we should be able to filter
1572
# out uninteresting nodes that are not referenced by interesting
1573
# items (we *do* currently filter out uninteresting nodes
1574
# referenced from the root.)
1575
# For now, since we don't ever abort from loading uninteresting items,
1576
# just read the whole thing in at once, so that we get a single
1577
# request, instead of multiple
1576
# Since we don't filter uninteresting any further than during
1577
# _read_all_roots, process the whole queue in a single pass.
1578
1578
refs = self._uninteresting_queue
1579
1579
self._uninteresting_queue = []
1580
1580
all_uninteresting_chks = self._all_uninteresting_chks
1585
1585
all_uninteresting_chks.update(refs)
1587
1587
def _process_queues(self):
1588
# Finish processing all of the items in the queue, for simplicity, we
1589
# just do things one node at a time
1590
# TODO: We should be able to do a lot more 'in parallel'
1591
1588
while self._uninteresting_queue:
1592
1589
self._process_next_uninteresting()
1593
1590
return self._flush_interesting_queue()
1593
for record in self._read_all_roots():
1595
for record, items in self._process_queues():
1596
1599
def iter_interesting_nodes(store, interesting_root_keys,
1597
1600
uninteresting_root_keys, pb=None):