1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008, 2009 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
60
58
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
61
59
# out, it takes 3.1MB to cache the layer.
62
60
_PAGE_CACHE_SIZE = 4*1024*1024
63
# Per thread caches for 2 reasons:
64
# - in the server we may be serving very different content, so we get less
66
# - we avoid locking on every cache lookup.
67
_thread_caches = threading.local()
69
_thread_caches.page_cache = None
72
"""Get the per-thread page cache.
74
We need a function to do this because in a new thread the _thread_caches
75
threading.local object does not have the cache initialized yet.
77
page_cache = getattr(_thread_caches, 'page_cache', None)
78
if page_cache is None:
79
# We are caching bytes so len(value) is perfectly accurate
80
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
81
_thread_caches.page_cache = page_cache
61
# We are caching bytes so len(value) is perfectly accurate
62
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
89
67
# If a ChildNode falls below this many bytes, we check for a remap
90
68
_INTERESTING_NEW_SIZE = 50
183
161
def _read_bytes(self, key):
185
return _get_cache()[key]
163
return _page_cache[key]
187
165
stream = self._store.get_record_stream([key], 'unordered', True)
188
166
bytes = stream.next().get_bytes_as('fulltext')
189
_get_cache()[key] = bytes
167
_page_cache[key] = bytes
192
170
def _dump_tree(self, include_keys=False):
244
222
root_key = klass._create_directly(store, initial_value,
245
223
maximum_size=maximum_size, key_width=key_width,
246
224
search_key_func=search_key_func)
247
if type(root_key) is not StaticTuple:
248
raise AssertionError('we got a %s instead of a StaticTuple'
225
assert type(root_key) is StaticTuple
287
263
for split, subnode in node_details:
288
264
node.add_node(split, subnode)
289
265
keys = list(node.serialise(store))
267
assert (type(root_node) is StaticTuple
268
and len(root_node) == 1 and
269
type(root_node[0]) is str)
292
272
def iter_changes(self, basis):
293
273
"""Iterate over the changes between basis and self.
513
493
def iteritems(self, key_filter=None):
514
494
"""Iterate over the entire CHKMap's contents."""
515
495
self._ensure_root()
496
# TODO: StaticTuple Barrier here
516
497
if key_filter is not None:
517
498
as_st = StaticTuple.from_sequence
518
499
key_filter = [as_st(key) for key in key_filter]
692
676
the key/value pairs.
695
__slots__ = ('_common_serialised_prefix',)
679
__slots__ = ('_common_serialised_prefix', '_serialise_key')
697
681
def __init__(self, search_key_func=None):
698
682
Node.__init__(self)
699
683
# All of the keys in this leaf node share this common prefix
700
684
self._common_serialised_prefix = None
685
self._serialise_key = '\x00'.join
701
686
if search_key_func is None:
702
687
self._search_key_func = _search_key_plain
741
726
:param bytes: The bytes of the node.
742
727
:param key: The key that the serialised node has.
744
key = static_tuple.expect_static_tuple(key)
745
729
return _deserialise_leaf_node(bytes, key,
746
730
search_key_func=search_key_func)
887
871
raise AssertionError('%r must be known' % self._search_prefix)
888
872
return self._search_prefix, [("", self)]
890
_serialise_key = '\x00'.join
892
874
def serialise(self, store):
893
875
"""Serialise the LeafNode to store.
923
905
bytes = ''.join(lines)
924
906
if len(bytes) != self._current_size():
925
907
raise AssertionError('Invalid _current_size')
926
_get_cache().add(self._key, bytes)
908
_page_cache.add(self._key, bytes)
927
909
return [self._key]
1042
1024
:param key: The key that the serialised node has.
1043
1025
:return: An InternalNode instance.
1045
key = static_tuple.expect_static_tuple(key)
1027
if type(key) is not StaticTuple:
1028
import pdb; pdb.set_trace()
1029
key = StaticTuple.from_sequence(key).intern()
1046
1030
return _deserialise_internal_node(bytes, key,
1047
1031
search_key_func=search_key_func)
1196
1180
prefix, node_key_filter = keys[record.key]
1197
1181
node_and_filters.append((node, node_key_filter))
1198
1182
self._items[prefix] = node
1199
_get_cache().add(record.key, bytes)
1183
_page_cache.add(record.key, bytes)
1200
1184
for info in node_and_filters:
1322
1306
lines.append(serialised[prefix_len:])
1323
1307
sha1, _, _ = store.add_lines((None,), (), lines)
1324
1308
self._key = StaticTuple("sha1:" + sha1,).intern()
1325
_get_cache().add(self._key, ''.join(lines))
1309
_page_cache.add(self._key, ''.join(lines))
1326
1310
yield self._key
1328
1312
def _search_key(self, key):
1477
1461
def __init__(self, store, new_root_keys, old_root_keys,
1478
1462
search_key_func, pb=None):
1479
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1480
# force callers to use StaticTuple, because there will often be
1481
# lots of keys passed in here. And even if we cast it locally,
1482
# that just meanst that we will have *both* a StaticTuple and a
1483
# tuple() in memory, referring to the same object. (so a net
1484
# increase in memory, not a decrease.)
1485
1463
self._store = store
1486
1464
self._new_root_keys = new_root_keys
1487
1465
self._old_root_keys = old_root_keys
1489
1467
# All uninteresting chks that we have seen. By the time they are added
1490
1468
# here, they should be either fully ignored, or queued up for
1492
# TODO: This might grow to a large size if there are lots of merge
1493
# parents, etc. However, it probably doesn't scale to O(history)
1494
# like _processed_new_refs does.
1495
1470
self._all_old_chks = set(self._old_root_keys)
1496
1471
# All items that we have seen from the old_root_keys
1497
1472
self._all_old_items = set()
1498
1473
# These are interesting items which were either read, or already in the
1499
1474
# interesting queue (so we don't need to walk them again)
1500
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1502
1475
self._processed_new_refs = set()
1503
1476
self._search_key_func = search_key_func
1511
1484
self._state = None
1513
1486
def _read_nodes_from_store(self, keys):
1514
# We chose not to use _get_cache(), because we think in
1515
# terms of records to be yielded. Also, we expect to touch each page
1516
# only 1 time during this code. (We may want to evaluate saving the
1517
# raw bytes into the page cache, which would allow a working tree
1518
# update after the fetch to not have to read the bytes again.)
1519
as_st = StaticTuple.from_sequence
1487
# We chose not to use _page_cache, because we think in terms of records
1488
# to be yielded. Also, we expect to touch each page only 1 time during
1489
# this code. (We may want to evaluate saving the raw bytes into the
1490
# page cache, which would allow a working tree update after the fetch
1491
# to not have to read the bytes again.)
1520
1492
stream = self._store.get_record_stream(keys, 'unordered', True)
1521
1493
for record in stream:
1522
1494
if self._pb is not None:
1529
1501
if type(node) is InternalNode:
1530
1502
# Note we don't have to do node.refs() because we know that
1531
1503
# there are no children that have been pushed into this node
1532
# Note: Using as_st() here seemed to save 1.2MB, which would
1533
# indicate that we keep 100k prefix_refs around while
1534
# processing. They *should* be shorter lived than that...
1535
# It does cost us ~10s of processing time
1536
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1537
1504
prefix_refs = node._items.items()
1540
1507
prefix_refs = []
1541
# Note: We don't use a StaticTuple here. Profiling showed a
1542
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1543
# But a significant slowdown (15s / 145s, or 10%)
1544
1508
items = node._items.items()
1545
1509
yield record, node, prefix_refs, items
1554
1518
if p_r[1] not in all_old_chks]
1555
1519
new_refs = [p_r[1] for p_r in prefix_refs]
1556
1520
all_old_chks.update(new_refs)
1557
# TODO: This might be a good time to turn items into StaticTuple
1558
# instances and possibly intern them. However, this does not
1559
# impact 'initial branch' performance, so I'm not worrying
1561
1521
self._all_old_items.update(items)
1562
1522
# Queue up the uninteresting references
1563
1523
# Don't actually put them in the 'to-read' queue until we have
1616
1576
# current design allows for this, as callers will do the work
1617
1577
# to make the results unique. We might profile whether we
1618
1578
# gain anything by ensuring unique return values for items
1619
# TODO: This might be a good time to cast to StaticTuple, as
1620
# self._new_item_queue will hold the contents of multiple
1621
# records for an extended lifetime
1622
1579
new_items = [item for item in items
1623
1580
if item not in self._all_old_items]
1624
1581
self._new_item_queue.extend(new_items)
1650
1607
yield None, new_items
1651
1608
refs = refs.difference(all_old_chks)
1652
processed_new_refs.update(refs)
1654
# TODO: Using a SimpleSet for self._processed_new_refs and
1655
# saved as much as 10MB of peak memory. However, it requires
1656
# implementing a non-pyrex version.
1657
1610
next_refs = set()
1658
1611
next_refs_update = next_refs.update
1659
1612
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1660
1613
# from 1m54s to 1m51s. Consider it.
1661
1614
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1663
# using the 'if' check saves about 145s => 141s, when
1664
# streaming initial branch of Launchpad data.
1665
items = [item for item in items
1666
if item not in all_old_items]
1615
items = [item for item in items
1616
if item not in all_old_items]
1667
1617
yield record, items
1668
1618
next_refs_update([p_r[1] for p_r in p_refs])
1670
# set1.difference(set/dict) walks all of set1, and checks if it
1671
# exists in 'other'.
1672
# set1.difference(iterable) walks all of iterable, and does a
1673
# 'difference_update' on a clone of set1. Pick wisely based on the
1674
# expected sizes of objects.
1675
# in our case it is expected that 'new_refs' will always be quite
1677
1619
next_refs = next_refs.difference(all_old_chks)
1678
1620
next_refs = next_refs.difference(processed_new_refs)
1679
1621
processed_new_refs.update(next_refs)
1686
1628
self._old_queue = []
1687
1629
all_old_chks = self._all_old_chks
1688
1630
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1689
# TODO: Use StaticTuple here?
1690
1631
self._all_old_items.update(items)
1691
1632
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1692
1633
self._old_queue.extend(refs)
1743
1684
search_key_registry.register('hash-16-way', _search_key_16)
1744
1685
search_key_registry.register('hash-255-way', _search_key_255)
1747
1687
def _check_key(key):
1748
"""Helper function to assert that a key is properly formatted.
1750
This generally shouldn't be used in production code, but it can be helpful
1753
1688
if type(key) is not StaticTuple:
1754
1689
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1755
1690
if len(key) != 1: