/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-16 04:56:26 UTC
  • mfrom: (4538 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4541.
  • Revision ID: andrew.bennetts@canonical.com-20090716045626-s5d9fiojyrf9bx30
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
38
38
"""
39
39
 
40
40
import heapq
41
 
import time
42
41
 
43
42
from bzrlib import lazy_import
44
43
lazy_import.lazy_import(globals(), """
45
44
from bzrlib import versionedfile
46
45
""")
47
46
from bzrlib import (
48
 
    errors,
49
47
    lru_cache,
50
48
    osutils,
51
49
    registry,
1398
1396
    return node
1399
1397
 
1400
1398
 
1401
 
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1402
 
    """Read the associated records, and determine what is interesting."""
1403
 
    uninteresting_keys = set(uninteresting_keys)
1404
 
    chks_to_read = uninteresting_keys.union(interesting_keys)
1405
 
    next_uninteresting = set()
1406
 
    next_interesting = set()
1407
 
    uninteresting_items = set()
1408
 
    interesting_items = set()
1409
 
    interesting_to_yield = []
1410
 
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
1411
 
        # records_read.add(record.key())
1412
 
        if pb is not None:
1413
 
            pb.tick()
1414
 
        bytes = record.get_bytes_as('fulltext')
1415
 
        # We don't care about search_key_func for this code, because we only
1416
 
        # care about external references.
1417
 
        node = _deserialise(bytes, record.key, search_key_func=None)
1418
 
        if record.key in uninteresting_keys:
1419
 
            if type(node) is InternalNode:
1420
 
                next_uninteresting.update(node.refs())
1421
 
            else:
1422
 
                # We know we are at a LeafNode, so we can pass None for the
1423
 
                # store
1424
 
                uninteresting_items.update(node.iteritems(None))
1425
 
        else:
1426
 
            interesting_to_yield.append(record.key)
1427
 
            if type(node) is InternalNode:
1428
 
                next_interesting.update(node.refs())
1429
 
            else:
1430
 
                interesting_items.update(node.iteritems(None))
1431
 
    return (next_uninteresting, uninteresting_items,
1432
 
            next_interesting, interesting_to_yield, interesting_items)
1433
 
 
1434
 
 
1435
 
def _find_all_uninteresting(store, interesting_root_keys,
1436
 
                            uninteresting_root_keys, pb):
1437
 
    """Determine the full set of uninteresting keys."""
1438
 
    # What about duplicates between interesting_root_keys and
1439
 
    # uninteresting_root_keys?
1440
 
    if not uninteresting_root_keys:
1441
 
        # Shortcut case. We know there is nothing uninteresting to filter out
1442
 
        # So we just let the rest of the algorithm do the work
1443
 
        # We know there is nothing uninteresting, and we didn't have to read
1444
 
        # any interesting records yet.
1445
 
        return (set(), set(), set(interesting_root_keys), [], set())
1446
 
    all_uninteresting_chks = set(uninteresting_root_keys)
1447
 
    all_uninteresting_items = set()
1448
 
 
1449
 
    # First step, find the direct children of both the interesting and
1450
 
    # uninteresting set
1451
 
    (uninteresting_keys, uninteresting_items,
1452
 
     interesting_keys, interesting_to_yield,
1453
 
     interesting_items) = _find_children_info(store, interesting_root_keys,
1454
 
                                              uninteresting_root_keys,
1455
 
                                              pb=pb)
1456
 
    all_uninteresting_chks.update(uninteresting_keys)
1457
 
    all_uninteresting_items.update(uninteresting_items)
1458
 
    del uninteresting_items
1459
 
    # Note: Exact matches between interesting and uninteresting do not need
1460
 
    #       to be search further. Non-exact matches need to be searched in case
1461
 
    #       there is a future exact-match
1462
 
    uninteresting_keys.difference_update(interesting_keys)
1463
 
 
1464
 
    # Second, find the full set of uninteresting bits reachable by the
1465
 
    # uninteresting roots
1466
 
    chks_to_read = uninteresting_keys
1467
 
    while chks_to_read:
1468
 
        next_chks = set()
1469
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1470
 
            # TODO: Handle 'absent'
1471
 
            if pb is not None:
1472
 
                pb.tick()
 
1399
class CHKMapDifference(object):
 
1400
    """Iterate the stored pages and key,value pairs for (new - old).
 
1401
 
 
1402
    This class provides a generator over the stored CHK pages and the
 
1403
    (key, value) pairs that are in any of the new maps and not in any of the
 
1404
    old maps.
 
1405
 
 
1406
    Note that it may yield chk pages that are common (especially root nodes),
 
1407
    but it won't yield (key,value) pairs that are common.
 
1408
    """
 
1409
 
 
1410
    def __init__(self, store, new_root_keys, old_root_keys,
 
1411
                 search_key_func, pb=None):
 
1412
        self._store = store
 
1413
        self._new_root_keys = new_root_keys
 
1414
        self._old_root_keys = old_root_keys
 
1415
        self._pb = pb
 
1416
        # All uninteresting chks that we have seen. By the time they are added
 
1417
        # here, they should be either fully ignored, or queued up for
 
1418
        # processing
 
1419
        self._all_old_chks = set(self._old_root_keys)
 
1420
        # All items that we have seen from the old_root_keys
 
1421
        self._all_old_items = set()
 
1422
        # These are interesting items which were either read, or already in the
 
1423
        # interesting queue (so we don't need to walk them again)
 
1424
        self._processed_new_refs = set()
 
1425
        self._search_key_func = search_key_func
 
1426
 
 
1427
        # The uninteresting and interesting nodes to be searched
 
1428
        self._old_queue = []
 
1429
        self._new_queue = []
 
1430
        # Holds the (key, value) items found when processing the root nodes,
 
1431
        # waiting for the uninteresting nodes to be walked
 
1432
        self._new_item_queue = []
 
1433
        self._state = None
 
1434
 
 
1435
    def _read_nodes_from_store(self, keys):
 
1436
        # We chose not to use _page_cache, because we think in terms of records
 
1437
        # to be yielded. Also, we expect to touch each page only 1 time during
 
1438
        # this code. (We may want to evaluate saving the raw bytes into the
 
1439
        # page cache, which would allow a working tree update after the fetch
 
1440
        # to not have to read the bytes again.)
 
1441
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1442
        for record in stream:
 
1443
            if self._pb is not None:
 
1444
                self._pb.tick()
 
1445
            if record.storage_kind == 'absent':
 
1446
                raise errors.NoSuchRevision(self._store, record.key)
1473
1447
            bytes = record.get_bytes_as('fulltext')
1474
 
            # We don't care about search_key_func for this code, because we
1475
 
            # only care about external references.
1476
 
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1448
            node = _deserialise(bytes, record.key,
 
1449
                                search_key_func=self._search_key_func)
1477
1450
            if type(node) is InternalNode:
1478
 
                # uninteresting_prefix_chks.update(node._items.iteritems())
1479
 
                chks = node._items.values()
1480
 
                # TODO: We remove the entries that are already in
1481
 
                #       uninteresting_chks ?
1482
 
                next_chks.update(chks)
1483
 
                all_uninteresting_chks.update(chks)
 
1451
                # Note we don't have to do node.refs() because we know that
 
1452
                # there are no children that have been pushed into this node
 
1453
                prefix_refs = node._items.items()
 
1454
                items = []
1484
1455
            else:
1485
 
                all_uninteresting_items.update(node._items.iteritems())
1486
 
        chks_to_read = next_chks
1487
 
    return (all_uninteresting_chks, all_uninteresting_items,
1488
 
            interesting_keys, interesting_to_yield, interesting_items)
 
1456
                prefix_refs = []
 
1457
                items = node._items.items()
 
1458
            yield record, node, prefix_refs, items
 
1459
 
 
1460
    def _read_old_roots(self):
 
1461
        old_chks_to_enqueue = []
 
1462
        all_old_chks = self._all_old_chks
 
1463
        for record, node, prefix_refs, items in \
 
1464
                self._read_nodes_from_store(self._old_root_keys):
 
1465
            # Uninteresting node
 
1466
            prefix_refs = [p_r for p_r in prefix_refs
 
1467
                                if p_r[1] not in all_old_chks]
 
1468
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1469
            all_old_chks.update(new_refs)
 
1470
            self._all_old_items.update(items)
 
1471
            # Queue up the uninteresting references
 
1472
            # Don't actually put them in the 'to-read' queue until we have
 
1473
            # finished checking the interesting references
 
1474
            old_chks_to_enqueue.extend(prefix_refs)
 
1475
        return old_chks_to_enqueue
 
1476
 
 
1477
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1478
        # At this point, we have read all the uninteresting and interesting
 
1479
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1480
        # handled the interesting ones
 
1481
        for prefix, ref in old_chks_to_enqueue:
 
1482
            not_interesting = True
 
1483
            for i in xrange(len(prefix), 0, -1):
 
1484
                if prefix[:i] in new_prefixes:
 
1485
                    not_interesting = False
 
1486
                    break
 
1487
            if not_interesting:
 
1488
                # This prefix is not part of the remaining 'interesting set'
 
1489
                continue
 
1490
            self._old_queue.append(ref)
 
1491
 
 
1492
    def _read_all_roots(self):
 
1493
        """Read the root pages.
 
1494
 
 
1495
        This is structured as a generator, so that the root records can be
 
1496
        yielded up to whoever needs them without any buffering.
 
1497
        """
 
1498
        # This is the bootstrap phase
 
1499
        if not self._old_root_keys:
 
1500
            # With no old_root_keys we can just shortcut and be ready
 
1501
            # for _flush_new_queue
 
1502
            self._new_queue = list(self._new_root_keys)
 
1503
            return
 
1504
        old_chks_to_enqueue = self._read_old_roots()
 
1505
        # filter out any root keys that are already known to be uninteresting
 
1506
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1507
        # These are prefixes that are present in new_keys that we are
 
1508
        # thinking to yield
 
1509
        new_prefixes = set()
 
1510
        # We are about to yield all of these, so we don't want them getting
 
1511
        # added a second time
 
1512
        processed_new_refs = self._processed_new_refs
 
1513
        processed_new_refs.update(new_keys)
 
1514
        for record, node, prefix_refs, items in \
 
1515
                self._read_nodes_from_store(new_keys):
 
1516
            # At this level, we now know all the uninteresting references
 
1517
            # So we filter and queue up whatever is remaining
 
1518
            prefix_refs = [p_r for p_r in prefix_refs
 
1519
                           if p_r[1] not in self._all_old_chks
 
1520
                              and p_r[1] not in processed_new_refs]
 
1521
            refs = [p_r[1] for p_r in prefix_refs]
 
1522
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1523
            self._new_queue.extend(refs)
 
1524
            # TODO: We can potentially get multiple items here, however the
 
1525
            #       current design allows for this, as callers will do the work
 
1526
            #       to make the results unique. We might profile whether we
 
1527
            #       gain anything by ensuring unique return values for items
 
1528
            new_items = [item for item in items
 
1529
                               if item not in self._all_old_items]
 
1530
            self._new_item_queue.extend(new_items)
 
1531
            new_prefixes.update([self._search_key_func(item[0])
 
1532
                                 for item in new_items])
 
1533
            processed_new_refs.update(refs)
 
1534
            yield record
 
1535
        # For new_prefixes we have the full length prefixes queued up.
 
1536
        # However, we also need possible prefixes. (If we have a known ref to
 
1537
        # 'ab', then we also need to include 'a'.) So expand the
 
1538
        # new_prefixes to include all shorter prefixes
 
1539
        for prefix in list(new_prefixes):
 
1540
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
 
1541
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1542
 
 
1543
    def _flush_new_queue(self):
 
1544
        # No need to maintain the heap invariant anymore, just pull things out
 
1545
        # and process them
 
1546
        refs = set(self._new_queue)
 
1547
        self._new_queue = []
 
1548
        # First pass, flush all interesting items and convert to using direct refs
 
1549
        all_old_chks = self._all_old_chks
 
1550
        processed_new_refs = self._processed_new_refs
 
1551
        all_old_items = self._all_old_items
 
1552
        new_items = [item for item in self._new_item_queue
 
1553
                           if item not in all_old_items]
 
1554
        self._new_item_queue = []
 
1555
        if new_items:
 
1556
            yield None, new_items
 
1557
        refs = refs.difference(all_old_chks)
 
1558
        while refs:
 
1559
            next_refs = set()
 
1560
            next_refs_update = next_refs.update
 
1561
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1562
            # from 1m54s to 1m51s. Consider it.
 
1563
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1564
                items = [item for item in items
 
1565
                         if item not in all_old_items]
 
1566
                yield record, items
 
1567
                next_refs_update([p_r[1] for p_r in p_refs])
 
1568
            next_refs = next_refs.difference(all_old_chks)
 
1569
            next_refs = next_refs.difference(processed_new_refs)
 
1570
            processed_new_refs.update(next_refs)
 
1571
            refs = next_refs
 
1572
 
 
1573
    def _process_next_old(self):
 
1574
        # Since we don't filter uninteresting any further than during
 
1575
        # _read_all_roots, process the whole queue in a single pass.
 
1576
        refs = self._old_queue
 
1577
        self._old_queue = []
 
1578
        all_old_chks = self._all_old_chks
 
1579
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1580
            self._all_old_items.update(items)
 
1581
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1582
            self._old_queue.extend(refs)
 
1583
            all_old_chks.update(refs)
 
1584
 
 
1585
    def _process_queues(self):
 
1586
        while self._old_queue:
 
1587
            self._process_next_old()
 
1588
        return self._flush_new_queue()
 
1589
 
 
1590
    def process(self):
 
1591
        for record in self._read_all_roots():
 
1592
            yield record, []
 
1593
        for record, items in self._process_queues():
 
1594
            yield record, items
1489
1595
 
1490
1596
 
1491
1597
def iter_interesting_nodes(store, interesting_root_keys,
1502
1608
    :return: Yield
1503
1609
        (interesting record, {interesting key:values})
1504
1610
    """
1505
 
    # TODO: consider that it may be more memory efficient to use the 20-byte
1506
 
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
1507
 
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
1508
 
    #       helper function similar to _read_bytes. This function should be
1509
 
    #       able to use nodes from the _page_cache as well as actually
1510
 
    #       requesting bytes from the store.
1511
 
 
1512
 
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1513
 
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1514
 
        interesting_root_keys, uninteresting_root_keys, pb)
1515
 
 
1516
 
    # Now that we know everything uninteresting, we can yield information from
1517
 
    # our first request
1518
 
    interesting_items.difference_update(all_uninteresting_items)
1519
 
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1520
 
    if interesting_items:
1521
 
        yield None, interesting_items
1522
 
    if interesting_to_yield:
1523
 
        # We request these records again, rather than buffering the root
1524
 
        # records, most likely they are still in the _group_cache anyway.
1525
 
        for record in store.get_record_stream(interesting_to_yield,
1526
 
                                              'unordered', False):
1527
 
            yield record, []
1528
 
    all_uninteresting_chks.update(interesting_to_yield)
1529
 
    interesting_keys.difference_update(all_uninteresting_chks)
1530
 
 
1531
 
    chks_to_read = interesting_keys
1532
 
    counter = 0
1533
 
    while chks_to_read:
1534
 
        next_chks = set()
1535
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1536
 
            counter += 1
1537
 
            if pb is not None:
1538
 
                pb.update('find chk pages', counter)
1539
 
            # TODO: Handle 'absent'?
1540
 
            bytes = record.get_bytes_as('fulltext')
1541
 
            # We don't care about search_key_func for this code, because we
1542
 
            # only care about external references.
1543
 
            node = _deserialise(bytes, record.key, search_key_func=None)
1544
 
            if type(node) is InternalNode:
1545
 
                # all_uninteresting_chks grows large, as it lists all nodes we
1546
 
                # don't want to process (including already seen interesting
1547
 
                # nodes).
1548
 
                # small.difference_update(large) scales O(large), but
1549
 
                # small.difference(large) scales O(small).
1550
 
                # Also, we know we just _deserialised this node, so we can
1551
 
                # access the dict directly.
1552
 
                chks = set(node._items.itervalues()).difference(
1553
 
                            all_uninteresting_chks)
1554
 
                # Is set() and .difference_update better than:
1555
 
                # chks = [chk for chk in node.refs()
1556
 
                #              if chk not in all_uninteresting_chks]
1557
 
                next_chks.update(chks)
1558
 
                # These are now uninteresting everywhere else
1559
 
                all_uninteresting_chks.update(chks)
1560
 
                interesting_items = []
1561
 
            else:
1562
 
                interesting_items = [item for item in node._items.iteritems()
1563
 
                                     if item not in all_uninteresting_items]
1564
 
                # TODO: Do we need to filter out items that we have already
1565
 
                #       seen on other pages? We don't really want to buffer the
1566
 
                #       whole thing, but it does mean that callers need to
1567
 
                #       understand they may get duplicate values.
1568
 
                # all_uninteresting_items.update(interesting_items)
1569
 
            yield record, interesting_items
1570
 
        chks_to_read = next_chks
 
1611
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1612
                                uninteresting_root_keys,
 
1613
                                search_key_func=store._search_key_func,
 
1614
                                pb=pb)
 
1615
    return iterator.process()
1571
1616
 
1572
1617
 
1573
1618
try: