1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
3
3
# This program is free software; you can redistribute it and/or modify
 
4
4
# it under the terms of the GNU General Public License as published by
 
 
57
 
from bzrlib.static_tuple import StaticTuple
 
60
57
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
 
61
58
# out, it takes 3.1MB to cache the layer.
 
62
59
_PAGE_CACHE_SIZE = 4*1024*1024
 
63
 
# Per thread caches for 2 reasons:
 
64
 
# - in the server we may be serving very different content, so we get less
 
66
 
# - we avoid locking on every cache lookup.
 
67
 
_thread_caches = threading.local()
 
69
 
_thread_caches.page_cache = None
 
72
 
    """Get the per-thread page cache.
 
74
 
    We need a function to do this because in a new thread the _thread_caches
 
75
 
    threading.local object does not have the cache initialized yet.
 
77
 
    page_cache = getattr(_thread_caches, 'page_cache', None)
 
78
 
    if page_cache is None:
 
79
 
        # We are caching bytes so len(value) is perfectly accurate
 
80
 
        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
 
81
 
        _thread_caches.page_cache = page_cache
 
 
60
# We are caching bytes so len(value) is perfectly accurate
 
 
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
 
89
66
# If a ChildNode falls below this many bytes, we check for a remap
 
90
67
_INTERESTING_NEW_SIZE = 50
 
 
106
83
class CHKMap(object):
 
107
84
    """A persistent map from string to string backed by a CHK store."""
 
109
 
    __slots__ = ('_store', '_root_node', '_search_key_func')
 
111
86
    def __init__(self, store, root_key, search_key_func=None):
 
112
87
        """Create a CHKMap object.
 
 
139
114
        # Check preconditions first.
 
140
 
        as_st = StaticTuple.from_sequence
 
141
 
        new_items = set([as_st(key) for (old, key, value) in delta
 
142
 
                         if key is not None and old is None])
 
 
115
        new_items = set([key for (old, key, value) in delta if key is not None
 
143
117
        existing_new = list(self.iteritems(key_filter=new_items))
 
145
119
            raise errors.InconsistentDeltaDelta(delta,
 
 
173
147
        :param node: A tuple key or node object.
 
174
148
        :return: A node object.
 
176
 
        if type(node) is StaticTuple:
 
 
150
        if type(node) is tuple:
 
177
151
            bytes = self._read_bytes(node)
 
178
152
            return _deserialise(bytes, node,
 
179
153
                search_key_func=self._search_key_func)
 
 
183
157
    def _read_bytes(self, key):
 
185
 
            return _get_cache()[key]
 
 
159
            return _page_cache[key]
 
187
161
            stream = self._store.get_record_stream([key], 'unordered', True)
 
188
162
            bytes = stream.next().get_bytes_as('fulltext')
 
189
 
            _get_cache()[key] = bytes
 
 
163
            _page_cache[key] = bytes
 
192
166
    def _dump_tree(self, include_keys=False):
 
 
220
194
            for key, value in sorted(node._items.iteritems()):
 
221
195
                # Don't use prefix nor indent here to line up when used in
 
222
196
                # tests in conjunction with assertEqualDiff
 
223
 
                result.append('      %r %r' % (tuple(key), value))
 
 
197
                result.append('      %r %r' % (key, value))
 
 
244
218
        root_key = klass._create_directly(store, initial_value,
 
245
219
            maximum_size=maximum_size, key_width=key_width,
 
246
220
            search_key_func=search_key_func)
 
247
 
        if type(root_key) is not StaticTuple:
 
248
 
            raise AssertionError('we got a %s instead of a StaticTuple'
 
 
267
238
        node = LeafNode(search_key_func=search_key_func)
 
268
239
        node.set_maximum_size(maximum_size)
 
269
240
        node._key_width = key_width
 
270
 
        as_st = StaticTuple.from_sequence
 
271
 
        node._items = dict([(as_st(key), val) for key, val
 
272
 
                                               in initial_value.iteritems()])
 
 
241
        node._items = dict(initial_value)
 
273
242
        node._raw_size = sum([node._key_value_len(key, value)
 
274
 
                              for key,value in node._items.iteritems()])
 
 
243
                              for key,value in initial_value.iteritems()])
 
275
244
        node._len = len(node._items)
 
276
245
        node._compute_search_prefix()
 
277
246
        node._compute_serialised_prefix()
 
 
513
482
    def iteritems(self, key_filter=None):
 
514
483
        """Iterate over the entire CHKMap's contents."""
 
515
484
        self._ensure_root()
 
516
 
        if key_filter is not None:
 
517
 
            as_st = StaticTuple.from_sequence
 
518
 
            key_filter = [as_st(key) for key in key_filter]
 
519
485
        return self._root_node.iteritems(self._store, key_filter=key_filter)
 
522
488
        """Return the key for this map."""
 
523
 
        if type(self._root_node) is StaticTuple:
 
 
489
        if type(self._root_node) is tuple:
 
524
490
            return self._root_node
 
526
492
            return self._root_node._key
 
 
535
501
        :param key: A key to map.
 
536
502
        :param value: The value to assign to key.
 
538
 
        key = StaticTuple.from_sequence(key)
 
539
504
        # Need a root object.
 
540
505
        self._ensure_root()
 
541
506
        prefix, node_details = self._root_node.map(self._store, key, value)
 
 
552
517
    def _node_key(self, node):
 
553
518
        """Get the key for a node whether it's a tuple or node."""
 
554
519
        if type(node) is tuple:
 
555
 
            node = StaticTuple.from_sequence(node)
 
556
 
        if type(node) is StaticTuple:
 
561
524
    def unmap(self, key, check_remap=True):
 
562
525
        """remove key from the map."""
 
563
 
        key = StaticTuple.from_sequence(key)
 
564
526
        self._ensure_root()
 
565
527
        if type(self._root_node) is InternalNode:
 
566
528
            unmapped = self._root_node.unmap(self._store, key,
 
 
594
556
        adding the header bytes, and without prefix compression.
 
597
 
    __slots__ = ('_key', '_len', '_maximum_size', '_key_width',
 
598
 
                 '_raw_size', '_items', '_search_prefix', '_search_key_func'
 
601
559
    def __init__(self, key_width=1):
 
602
560
        """Create a node.
 
 
692
650
        the key/value pairs.
 
695
 
    __slots__ = ('_common_serialised_prefix',)
 
697
653
    def __init__(self, search_key_func=None):
 
698
654
        Node.__init__(self)
 
699
655
        # All of the keys in this leaf node share this common prefix
 
700
656
        self._common_serialised_prefix = None
 
 
657
        self._serialise_key = '\x00'.join
 
701
658
        if search_key_func is None:
 
702
659
            self._search_key_func = _search_key_plain
 
 
741
698
        :param bytes: The bytes of the node.
 
742
699
        :param key: The key that the serialised node has.
 
744
 
        key = static_tuple.expect_static_tuple(key)
 
745
701
        return _deserialise_leaf_node(bytes, key,
 
746
702
                                      search_key_func=search_key_func)
 
 
887
843
                raise AssertionError('%r must be known' % self._search_prefix)
 
888
844
            return self._search_prefix, [("", self)]
 
890
 
    _serialise_key = '\x00'.join
 
892
846
    def serialise(self, store):
 
893
847
        """Serialise the LeafNode to store.
 
 
919
873
            lines.append(serialized[prefix_len:])
 
920
874
            lines.extend(value_lines)
 
921
875
        sha1, _, _ = store.add_lines((None,), (), lines)
 
922
 
        self._key = StaticTuple("sha1:" + sha1,).intern()
 
 
876
        self._key = ("sha1:" + sha1,)
 
923
877
        bytes = ''.join(lines)
 
924
878
        if len(bytes) != self._current_size():
 
925
879
            raise AssertionError('Invalid _current_size')
 
926
 
        _get_cache().add(self._key, bytes)
 
 
880
        _page_cache.add(self._key, bytes)
 
927
881
        return [self._key]
 
 
993
947
        LeafNode or InternalNode.
 
996
 
    __slots__ = ('_node_width',)
 
998
950
    def __init__(self, prefix='', search_key_func=None):
 
999
951
        Node.__init__(self)
 
1000
952
        # The size of an internalnode with default values and no children.
 
 
1042
994
        :param key: The key that the serialised node has.
 
1043
995
        :return: An InternalNode instance.
 
1045
 
        key = static_tuple.expect_static_tuple(key)
 
1046
997
        return _deserialise_internal_node(bytes, key,
 
1047
998
                                          search_key_func=search_key_func)
 
 
1073
1024
            # for whatever we are missing
 
1074
1025
            shortcut = True
 
1075
1026
            for prefix, node in self._items.iteritems():
 
1076
 
                if node.__class__ is StaticTuple:
 
 
1027
                if node.__class__ is tuple:
 
1077
1028
                    keys[node] = (prefix, None)
 
1079
1030
                    yield node, None
 
 
1108
1059
                    # A given key can only match 1 child node, if it isn't
 
1109
1060
                    # there, then we can just return nothing
 
1111
 
                if node.__class__ is StaticTuple:
 
 
1062
                if node.__class__ is tuple:
 
1112
1063
                    keys[node] = (search_prefix, [key])
 
1114
1065
                    # This is loaded, and the only thing that can match,
 
 
1141
1092
                        # We can ignore this one
 
1143
1094
                    node_key_filter = prefix_to_keys[search_prefix]
 
1144
 
                    if node.__class__ is StaticTuple:
 
 
1095
                    if node.__class__ is tuple:
 
1145
1096
                        keys[node] = (search_prefix, node_key_filter)
 
1147
1098
                        yield node, node_key_filter
 
 
1156
1107
                        if sub_prefix in length_filter:
 
1157
1108
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
 
1158
1109
                    if node_key_filter: # this key matched something, yield it
 
1159
 
                        if node.__class__ is StaticTuple:
 
 
1110
                        if node.__class__ is tuple:
 
1160
1111
                            keys[node] = (prefix, node_key_filter)
 
1162
1113
                            yield node, node_key_filter
 
 
1196
1147
                    prefix, node_key_filter = keys[record.key]
 
1197
1148
                    node_and_filters.append((node, node_key_filter))
 
1198
1149
                    self._items[prefix] = node
 
1199
 
                    _get_cache().add(record.key, bytes)
 
 
1150
                    _page_cache.add(record.key, bytes)
 
1200
1151
                for info in node_and_filters:
 
 
1311
1262
        lines.append('%s\n' % (self._search_prefix,))
 
1312
1263
        prefix_len = len(self._search_prefix)
 
1313
1264
        for prefix, node in sorted(self._items.items()):
 
1314
 
            if type(node) is StaticTuple:
 
 
1265
            if type(node) is tuple:
 
1317
1268
                key = node._key[0]
 
 
1321
1272
                    % (serialised, self._search_prefix))
 
1322
1273
            lines.append(serialised[prefix_len:])
 
1323
1274
        sha1, _, _ = store.add_lines((None,), (), lines)
 
1324
 
        self._key = StaticTuple("sha1:" + sha1,).intern()
 
1325
 
        _get_cache().add(self._key, ''.join(lines))
 
 
1275
        self._key = ("sha1:" + sha1,)
 
 
1276
        _page_cache.add(self._key, ''.join(lines))
 
1326
1277
        yield self._key
 
1328
1279
    def _search_key(self, key):
 
 
1477
1428
    def __init__(self, store, new_root_keys, old_root_keys,
 
1478
1429
                 search_key_func, pb=None):
 
1479
 
        # TODO: Should we add a StaticTuple barrier here? It would be nice to
 
1480
 
        #       force callers to use StaticTuple, because there will often be
 
1481
 
        #       lots of keys passed in here. And even if we cast it locally,
 
1482
 
        #       that just meanst that we will have *both* a StaticTuple and a
 
1483
 
        #       tuple() in memory, referring to the same object. (so a net
 
1484
 
        #       increase in memory, not a decrease.)
 
1485
1430
        self._store = store
 
1486
1431
        self._new_root_keys = new_root_keys
 
1487
1432
        self._old_root_keys = old_root_keys
 
 
1489
1434
        # All uninteresting chks that we have seen. By the time they are added
 
1490
1435
        # here, they should be either fully ignored, or queued up for
 
1492
 
        # TODO: This might grow to a large size if there are lots of merge
 
1493
 
        #       parents, etc. However, it probably doesn't scale to O(history)
 
1494
 
        #       like _processed_new_refs does.
 
1495
1437
        self._all_old_chks = set(self._old_root_keys)
 
1496
1438
        # All items that we have seen from the old_root_keys
 
1497
1439
        self._all_old_items = set()
 
1498
1440
        # These are interesting items which were either read, or already in the
 
1499
1441
        # interesting queue (so we don't need to walk them again)
 
1500
 
        # TODO: processed_new_refs becomes O(all_chks), consider switching to
 
1502
1442
        self._processed_new_refs = set()
 
1503
1443
        self._search_key_func = search_key_func
 
 
1511
1451
        self._state = None
 
1513
1453
    def _read_nodes_from_store(self, keys):
 
1514
 
        # We chose not to use _get_cache(), because we think in
 
1515
 
        # terms of records to be yielded. Also, we expect to touch each page
 
1516
 
        # only 1 time during this code. (We may want to evaluate saving the
 
1517
 
        # raw bytes into the page cache, which would allow a working tree
 
1518
 
        # update after the fetch to not have to read the bytes again.)
 
1519
 
        as_st = StaticTuple.from_sequence
 
 
1454
        # We chose not to use _page_cache, because we think in terms of records
 
 
1455
        # to be yielded. Also, we expect to touch each page only 1 time during
 
 
1456
        # this code. (We may want to evaluate saving the raw bytes into the
 
 
1457
        # page cache, which would allow a working tree update after the fetch
 
 
1458
        # to not have to read the bytes again.)
 
1520
1459
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1521
1460
        for record in stream:
 
1522
1461
            if self._pb is not None:
 
 
1529
1468
            if type(node) is InternalNode:
 
1530
1469
                # Note we don't have to do node.refs() because we know that
 
1531
1470
                # there are no children that have been pushed into this node
 
1532
 
                # Note: Using as_st() here seemed to save 1.2MB, which would
 
1533
 
                #       indicate that we keep 100k prefix_refs around while
 
1534
 
                #       processing. They *should* be shorter lived than that...
 
1535
 
                #       It does cost us ~10s of processing time
 
1536
 
                #prefix_refs = [as_st(item) for item in node._items.iteritems()]
 
1537
1471
                prefix_refs = node._items.items()
 
1540
1474
                prefix_refs = []
 
1541
 
                # Note: We don't use a StaticTuple here. Profiling showed a
 
1542
 
                #       minor memory improvement (0.8MB out of 335MB peak 0.2%)
 
1543
 
                #       But a significant slowdown (15s / 145s, or 10%)
 
1544
1475
                items = node._items.items()
 
1545
1476
            yield record, node, prefix_refs, items
 
 
1554
1485
                                if p_r[1] not in all_old_chks]
 
1555
1486
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1556
1487
            all_old_chks.update(new_refs)
 
1557
 
            # TODO: This might be a good time to turn items into StaticTuple
 
1558
 
            #       instances and possibly intern them. However, this does not
 
1559
 
            #       impact 'initial branch' performance, so I'm not worrying
 
1561
1488
            self._all_old_items.update(items)
 
1562
1489
            # Queue up the uninteresting references
 
1563
1490
            # Don't actually put them in the 'to-read' queue until we have
 
 
1616
1543
            #       current design allows for this, as callers will do the work
 
1617
1544
            #       to make the results unique. We might profile whether we
 
1618
1545
            #       gain anything by ensuring unique return values for items
 
1619
 
            # TODO: This might be a good time to cast to StaticTuple, as
 
1620
 
            #       self._new_item_queue will hold the contents of multiple
 
1621
 
            #       records for an extended lifetime
 
1622
1546
            new_items = [item for item in items
 
1623
1547
                               if item not in self._all_old_items]
 
1624
1548
            self._new_item_queue.extend(new_items)
 
 
1650
1574
            yield None, new_items
 
1651
1575
        refs = refs.difference(all_old_chks)
 
1652
 
        processed_new_refs.update(refs)
 
1654
 
            # TODO: Using a SimpleSet for self._processed_new_refs and
 
1655
 
            #       saved as much as 10MB of peak memory. However, it requires
 
1656
 
            #       implementing a non-pyrex version.
 
1657
1577
            next_refs = set()
 
1658
1578
            next_refs_update = next_refs.update
 
1659
1579
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1660
1580
            # from 1m54s to 1m51s. Consider it.
 
1661
1581
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1663
 
                    # using the 'if' check saves about 145s => 141s, when
 
1664
 
                    # streaming initial branch of Launchpad data.
 
1665
 
                    items = [item for item in items
 
1666
 
                             if item not in all_old_items]
 
 
1582
                items = [item for item in items
 
 
1583
                         if item not in all_old_items]
 
1667
1584
                yield record, items
 
1668
1585
                next_refs_update([p_r[1] for p_r in p_refs])
 
1670
 
            # set1.difference(set/dict) walks all of set1, and checks if it
 
1671
 
            # exists in 'other'.
 
1672
 
            # set1.difference(iterable) walks all of iterable, and does a
 
1673
 
            # 'difference_update' on a clone of set1. Pick wisely based on the
 
1674
 
            # expected sizes of objects.
 
1675
 
            # in our case it is expected that 'new_refs' will always be quite
 
1677
1586
            next_refs = next_refs.difference(all_old_chks)
 
1678
1587
            next_refs = next_refs.difference(processed_new_refs)
 
1679
1588
            processed_new_refs.update(next_refs)
 
 
1686
1595
        self._old_queue = []
 
1687
1596
        all_old_chks = self._all_old_chks
 
1688
1597
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1689
 
            # TODO: Use StaticTuple here?
 
1690
1598
            self._all_old_items.update(items)
 
1691
1599
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1692
1600
            self._old_queue.extend(refs)
 
 
1743
1650
search_key_registry.register('hash-16-way', _search_key_16)
 
1744
1651
search_key_registry.register('hash-255-way', _search_key_255)
 
1747
 
def _check_key(key):
 
1748
 
    """Helper function to assert that a key is properly formatted.
 
1750
 
    This generally shouldn't be used in production code, but it can be helpful
 
1753
 
    if type(key) is not StaticTuple:
 
1754
 
        raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
 
1756
 
        raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
 
1757
 
    if type(key[0]) is not str:
 
1758
 
        raise TypeError('key %r should hold a str, not %r'
 
1759
 
                        % (key, type(key[0])))
 
1760
 
    if not key[0].startswith('sha1:'):
 
1761
 
        raise ValueError('key %r should point to a sha1:' % (key,))