120
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
121
120
content. If None, consume everything
123
if self._content_length is None:
124
raise AssertionError('self._content_length should never be None')
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
125
129
if num_bytes is None:
126
130
num_bytes = self._content_length
127
131
elif (self._content_length is not None
144
148
self._content = pylzma.decompress(self._z_content)
145
149
elif self._compressor_name == 'zlib':
146
150
# Start a zlib decompressor
147
if num_bytes * 4 > self._content_length * 3:
148
# If we are requesting more that 3/4ths of the content,
149
# just extract the whole thing in a single pass
150
num_bytes = self._content_length
151
if num_bytes is None:
151
152
self._content = zlib.decompress(self._z_content)
153
154
self._z_content_decompressor = zlib.decompressobj()
164
163
# 'unconsumed_tail'
166
165
# Do we have enough bytes already?
167
if len(self._content) >= num_bytes:
166
if num_bytes is not None and len(self._content) >= num_bytes:
168
if num_bytes is None and self._z_content_decompressor is None:
169
# We must have already decompressed everything
169
171
# If we got this far, and don't have a decompressor, something is wrong
170
172
if self._z_content_decompressor is None:
171
173
raise AssertionError(
172
174
'No decompressor to decompress %d bytes' % num_bytes)
173
175
remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
if not remaining_decomp:
175
raise AssertionError('Nothing left to decompress')
176
needed_bytes = num_bytes - len(self._content)
177
# We always set max_size to 32kB over the minimum needed, so that
178
# zlib will give us as much as we really want.
179
# TODO: If this isn't good enough, we could make a loop here,
180
# that keeps expanding the request until we get enough
181
self._content += self._z_content_decompressor.decompress(
182
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
183
if len(self._content) < num_bytes:
184
raise AssertionError('%d bytes wanted, only %d available'
185
% (num_bytes, len(self._content)))
186
if not self._z_content_decompressor.unconsumed_tail:
187
# The stream is finished
188
self._z_content_decompressor = None
176
if num_bytes is None:
178
# We don't know how much is left, but we'll decompress it all
179
self._content += self._z_content_decompressor.decompress(
181
# Note: There's what I consider a bug in zlib.decompressobj
182
# If you pass back in the entire unconsumed_tail, only
183
# this time you don't pass a max-size, it doesn't
184
# change the unconsumed_tail back to None/''.
185
# However, we know we are done with the whole stream
186
self._z_content_decompressor = None
187
# XXX: Why is this the only place in this routine we set this?
188
self._content_length = len(self._content)
190
if not remaining_decomp:
191
raise AssertionError('Nothing left to decompress')
192
needed_bytes = num_bytes - len(self._content)
193
# We always set max_size to 32kB over the minimum needed, so that
194
# zlib will give us as much as we really want.
195
# TODO: If this isn't good enough, we could make a loop here,
196
# that keeps expanding the request until we get enough
197
self._content += self._z_content_decompressor.decompress(
198
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
if len(self._content) < num_bytes:
200
raise AssertionError('%d bytes wanted, only %d available'
201
% (num_bytes, len(self._content)))
202
if not self._z_content_decompressor.unconsumed_tail:
203
# The stream is finished
204
self._z_content_decompressor = None
190
206
def _parse_bytes(self, bytes, pos):
191
207
"""Read the various lengths from the header.
1152
1168
class GroupCompressVersionedFiles(VersionedFiles):
1153
1169
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1171
def __init__(self, index, access, delta=True):
1156
1172
"""Create a GroupCompressVersionedFiles object.
1158
1174
:param index: The index object storing access and graph data.
1159
1175
:param access: The access object storing raw data.
1160
1176
:param delta: Whether to delta compress or just entropy compress.
1161
:param _unadded_refs: private parameter, don't use.
1163
1178
self._index = index
1164
1179
self._access = access
1165
1180
self._delta = delta
1166
if _unadded_refs is None:
1168
self._unadded_refs = _unadded_refs
1181
self._unadded_refs = {}
1169
1182
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
1183
self._fallback_vfs = []
1172
def without_fallbacks(self):
1173
"""Return a clone of this object without any fallbacks configured."""
1174
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1177
1185
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1186
left_matching_blocks=None, nostore_sha=None, random_id=False,
1179
1187
check_content=True):
1267
1275
return self.get_record_stream(keys, 'unordered', True)
1269
def clear_cache(self):
1270
"""See VersionedFiles.clear_cache()"""
1271
self._group_cache.clear()
1272
self._index._graph_index.clear_cache()
1273
self._index._int_cache.clear()
1275
1277
def _check_add(self, key, lines, random_id, check_content):
1276
1278
"""check that version_id and lines are safe to add."""
1277
1279
version_id = key[-1]
1747
1749
key = record.key
1748
1750
self._unadded_refs[key] = record.parents
1749
1751
yield found_sha1
1750
as_st = static_tuple.StaticTuple.from_sequence
1751
if record.parents is not None:
1752
parents = as_st([as_st(p) for p in record.parents])
1755
refs = static_tuple.StaticTuple(parents)
1756
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1752
keys_to_add.append((key, '%d %d' % (start_point, end_point),
1757
1754
if len(keys_to_add):
1759
1756
self._compressor = None
1815
1812
def __init__(self, graph_index, is_locked, parents=True,
1816
1813
add_callback=None, track_external_parent_refs=False,
1817
inconsistency_fatal=True, track_new_keys=False):
1814
inconsistency_fatal=True):
1818
1815
"""Construct a _GCGraphIndex on a graph_index.
1820
1817
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1839
1836
self.has_graph = parents
1840
1837
self._is_locked = is_locked
1841
1838
self._inconsistency_fatal = inconsistency_fatal
1842
# GroupCompress records tend to have the same 'group' start + offset
1843
# repeated over and over, this creates a surplus of ints
1844
self._int_cache = {}
1845
1839
if track_external_parent_refs:
1846
self._key_dependencies = knit._KeyRefs(
1847
track_new_keys=track_new_keys)
1840
self._key_dependencies = knit._KeyRefs()
1849
1842
self._key_dependencies = None
1883
1876
if not random_id:
1884
1877
present_nodes = self._get_entries(keys)
1885
1878
for (index, key, value, node_refs) in present_nodes:
1886
# Sometimes these are passed as a list rather than a tuple
1887
node_refs = static_tuple.as_tuples(node_refs)
1888
passed = static_tuple.as_tuples(keys[key])
1889
if node_refs != passed[1]:
1890
details = '%s %s %s' % (key, (value, node_refs), passed)
1879
if node_refs != keys[key][1]:
1880
details = '%s %s %s' % (key, (value, node_refs), keys[key])
1891
1881
if self._inconsistency_fatal:
1892
1882
raise errors.KnitCorrupt(self, "inconsistent details"
1893
1883
" in add_records: %s" %
1907
1897
result.append((key, value))
1908
1898
records = result
1909
1899
key_dependencies = self._key_dependencies
1910
if key_dependencies is not None:
1912
for key, value, refs in records:
1914
key_dependencies.add_references(key, parents)
1916
for key, value, refs in records:
1917
new_keys.add_key(key)
1900
if key_dependencies is not None and self._parents:
1901
for key, value, refs in records:
1903
key_dependencies.add_references(key, parents)
1918
1904
self._add_callback(records)
1920
1906
def _check_read(self):
2026
2012
"""Convert an index value to position details."""
2027
2013
bits = node[2].split(' ')
2028
2014
# It would be nice not to read the entire gzip.
2029
# start and stop are put into _int_cache because they are very common.
2030
# They define the 'group' that an entry is in, and many groups can have
2031
# thousands of objects.
2032
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2033
# each, or about 7MB. Note that it might be even more when you consider
2034
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2038
2015
start = int(bits[0])
2039
start = self._int_cache.setdefault(start, start)
2040
2016
stop = int(bits[1])
2041
stop = self._int_cache.setdefault(stop, stop)
2042
2017
basis_end = int(bits[2])
2043
2018
delta_end = int(bits[3])
2044
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2046
return (node[0], start, stop, basis_end, delta_end)
2019
return node[0], start, stop, basis_end, delta_end
2048
2021
def scan_unvalidated_index(self, graph_index):
2049
2022
"""Inform this _GCGraphIndex that there is an unvalidated index.
2051
2024
This allows this _GCGraphIndex to keep track of any missing
2052
2025
compression parents we may want to have filled in to make those
2053
indices valid. It also allows _GCGraphIndex to track any new keys.
2055
2028
:param graph_index: A GraphIndex
2057
key_dependencies = self._key_dependencies
2058
if key_dependencies is None:
2060
for node in graph_index.iter_all_entries():
2061
# Add parent refs from graph_index (and discard parent refs
2062
# that the graph_index has).
2063
key_dependencies.add_references(node[1], node[3][0])
2030
if self._key_dependencies is not None:
2031
# Add parent refs from graph_index (and discard parent refs that
2032
# the graph_index has).
2033
add_refs = self._key_dependencies.add_references
2034
for node in graph_index.iter_all_entries():
2035
add_refs(node[1], node[3][0])
2066
2039
from bzrlib._groupcompress_py import (