23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
33
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
42
49
AbsentContentFactory,
43
50
ChunkedContentFactory,
44
51
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
135
142
self._content = ''.join(self._content_chunks)
136
143
self._content_chunks = None
137
144
if self._content is None:
138
if self._z_content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
139
149
raise AssertionError('No content to decompress')
140
if self._z_content == '':
150
z_content = ''.join(self._z_content_chunks)
141
152
self._content = ''
142
153
elif self._compressor_name == 'lzma':
143
154
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
155
self._content = pylzma.decompress(z_content)
145
156
elif self._compressor_name == 'zlib':
146
157
# Start a zlib decompressor
147
158
if num_bytes * 4 > self._content_length * 3:
148
159
# If we are requesting more that 3/4ths of the content,
149
160
# just extract the whole thing in a single pass
150
161
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
162
self._content = zlib.decompress(z_content)
153
164
self._z_content_decompressor = zlib.decompressobj()
154
165
# Seed the decompressor with the uncompressed bytes, so
155
166
# that the rest of the code is simplified
156
167
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
169
if not self._z_content_decompressor.unconsumed_tail:
159
170
self._z_content_decompressor = None
207
218
# XXX: Define some GCCorrupt error ?
208
219
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
220
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
221
self._z_content_chunks = (bytes[pos:],)
224
def _z_content(self):
225
"""Return z_content_chunks as a simple string.
227
Meant only to be used by the test suite.
229
if self._z_content_chunks is not None:
230
return ''.join(self._z_content_chunks)
213
234
def from_bytes(cls, bytes):
269
290
self._content_length = length
270
291
self._content_chunks = content_chunks
271
292
self._content = None
272
self._z_content = None
293
self._z_content_chunks = None
274
295
def set_content(self, content):
275
296
"""Set the content of this block."""
276
297
self._content_length = len(content)
277
298
self._content = content
278
self._z_content = None
299
self._z_content_chunks = None
280
301
def _create_z_content_using_lzma(self):
281
302
if self._content_chunks is not None:
283
304
self._content_chunks = None
284
305
if self._content is None:
285
306
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
311
def _create_z_content_from_chunks(self, chunks):
290
312
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
292
316
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
296
321
def _create_z_content(self):
297
if self._z_content is not None:
322
if self._z_content_chunks is not None:
300
325
self._create_z_content_using_lzma()
302
327
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
334
"""Create the byte stream as a series of 'chunks'"""
310
335
self._create_z_content()
312
337
header = self.GCB_LZ_HEADER
314
339
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
319
350
return ''.join(chunks)
321
352
def _dump(self, include_text=False):
460
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
464
495
self._block = block
465
496
# We need to preserve the ordering
466
497
self._factories = []
467
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
469
514
def add_factory(self, key, parents, start, end):
470
515
if not self._factories:
503
548
new_block.set_content(self._block._content[:last_byte])
504
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
506
554
def _rebuild_block(self):
507
555
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
509
557
tstart = time.time()
510
558
old_length = self._block._content_length
523
571
# block? It seems hard to come up with a method that it would
524
572
# expand, since we do full compression again. Perhaps based on a
525
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
526
579
delta = time.time() - tstart
527
580
self._block = new_block
528
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
679
732
z_header_bytes = zlib.compress(header_bytes)
681
734
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
735
block_bytes_len, block_chunks = self._block.to_chunks()
683
736
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
738
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
739
lines.extend(block_chunks)
740
del z_header_bytes, block_chunks
741
# TODO: This is a point where we will double the memory consumption. To
742
# avoid this, we probably have to switch to a 'chunked' api
688
743
return ''.join(lines)
691
746
def from_bytes(cls, bytes):
692
747
# TODO: This does extra string copying, probably better to do it a
748
# different way. At a minimum this creates 2 copies of the
694
750
(storage_kind, z_header_len, header_len,
695
751
block_len, rest) = bytes.split('\n', 4)
855
911
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
913
self._block.set_chunked_content(self.chunks, self.endpoint)
866
914
self.chunks = None
867
915
self._delta_index = None
995
def __init__(self, settings=None):
948
996
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
998
max_bytes_to_index = \
999
GroupCompressVersionedFiles._DEFAULT_MAX_BYTES_TO_INDEX
1001
(max_bytes_to_index,) = settings
1002
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1004
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1005
"""see _CommonGroupCompressor._compress"""
1027
1080
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1081
add_callback=graph_index.add_nodes,
1029
1082
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1083
access = pack_repo._DirectPackAccess({})
1031
1084
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1085
result = GroupCompressVersionedFiles(index, access, delta)
1033
1086
result.stream = stream
1044
1097
class _BatchingBlockFetcher(object):
1045
1098
"""Fetch group compress blocks in batches.
1047
1100
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1101
currently pending batch.
1051
def __init__(self, gcvf, locations):
1104
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1105
self.gcvf = gcvf
1053
1106
self.locations = locations
1102
1156
def yield_factories(self, full_flush=False):
1103
1157
"""Yield factories for keys added since the last yield. They will be
1104
1158
returned in the order they were added via add_key.
1106
1160
:param full_flush: by default, some results may not be returned in case
1107
1161
they can be part of the next batch. If full_flush is True, then
1108
1162
all results are returned.
1136
1190
memos_to_get_stack.pop()
1138
1192
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1193
self.manager = _LazyGroupContentManager(block,
1194
get_compressor_settings=self._get_compressor_settings)
1140
1195
self.last_read_memo = read_memo
1141
1196
start, end = index_memo[3:5]
1142
1197
self.manager.add_factory(key, parents, start, end)
1149
1204
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1208
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1210
# This controls how the GroupCompress DeltaIndex works. Basically, we
1211
# compute hash pointers into the source blocks (so hash(text) => text).
1212
# However each of these references costs some memory in trade against a
1213
# more accurate match result. For very large files, they either are
1214
# pre-compressed and change in bulk whenever they change, or change in just
1215
# local blocks. Either way, 'improved resolution' is not very helpful,
1216
# versus running out of memory trying to track everything. The default max
1217
# gives 100% sampling of a 1MB file.
1218
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1219
_DEFAULT_COMPRESSOR_SETTINGS = (_DEFAULT_MAX_BYTES_TO_INDEX,)
1221
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1223
"""Create a GroupCompressVersionedFiles object.
1158
1225
:param index: The index object storing access and graph data.
1159
1226
:param access: The access object storing raw data.
1160
1227
:param delta: Whether to delta compress or just entropy compress.
1161
1228
:param _unadded_refs: private parameter, don't use.
1229
:param _group_cache: private parameter, don't use.
1163
1231
self._index = index
1164
1232
self._access = access
1166
1234
if _unadded_refs is None:
1167
1235
_unadded_refs = {}
1168
1236
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1237
if _group_cache is None:
1238
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1239
self._group_cache = _group_cache
1240
self._immediate_fallback_vfs = []
1241
self._max_bytes_to_index = None
1172
1243
def without_fallbacks(self):
1173
1244
"""Return a clone of this object without any fallbacks configured."""
1174
1245
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1246
self._delta, _unadded_refs=dict(self._unadded_refs),
1247
_group_cache=self._group_cache)
1177
1249
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1250
left_matching_blocks=None, nostore_sha=None, random_id=False,
1293
1365
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
1367
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1368
for fallback in self._transitive_fallbacks():
1297
1369
if not missing_keys:
1299
1371
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1451
1523
the defined order, regardless of source.
1453
1525
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1526
present_keys = tsort.topo_sort(parent_map)
1456
1528
# ordering == 'groupcompress'
1457
1529
# XXX: This only optimizes for the target ordering. We may need
1546
1618
# - we encounter an unadded ref, or
1547
1619
# - we run out of keys, or
1548
1620
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1621
batcher = _BatchingBlockFetcher(self, locations,
1622
get_compressor_settings=self._get_compressor_settings)
1550
1623
for source, keys in source_keys:
1551
1624
if source is self:
1552
1625
for key in keys:
1598
1671
for _ in self._insert_record_stream(stream, random_id=False):
1674
def _get_compressor_settings(self):
1675
if self._max_bytes_to_index is None:
1676
# TODO: VersionedFiles don't know about their containing
1677
# repository, so they don't have much of an idea about their
1678
# location. So for now, this is only a global option.
1679
c = config.GlobalConfig()
1680
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1684
except ValueError, e:
1685
trace.warning('Value for '
1686
'"bzr.groupcompress.max_bytes_to_index"'
1687
' %r is not an integer'
1691
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1692
self._max_bytes_to_index = val
1693
return (self._max_bytes_to_index,)
1695
def _make_group_compressor(self):
1696
return GroupCompressor(self._get_compressor_settings())
1601
1698
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1699
reuse_blocks=True):
1603
1700
"""Internal core to insert a record stream into this container.
1627
1724
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1726
self._compressor = self._make_group_compressor()
1630
1727
self._unadded_refs = {}
1631
1728
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1730
bytes_len, chunks = self._compressor.flush().to_chunks()
1731
self._compressor = self._make_group_compressor()
1732
# Note: At this point we still have 1 copy of the fulltext (in
1733
# record and the var 'bytes'), and this generates 2 copies of
1734
# the compressed text (one for bytes, one in chunks)
1735
# TODO: Push 'chunks' down into the _access api, so that we don't
1736
# have to double compressed memory here
1737
# TODO: Figure out how to indicate that we would be happy to free
1738
# the fulltext content at this point. Note that sometimes we
1739
# will want it later (streaming CHK pages), but most of the
1740
# time we won't (everything else)
1741
bytes = ''.join(chunks)
1635
1743
index, start, length = self._access.add_raw_records(
1636
1744
[(None, len(bytes))], bytes)[0]
1802
1910
"""See VersionedFiles.keys."""
1803
1911
if 'evil' in debug.debug_flags:
1804
1912
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1913
sources = [self._index] + self._immediate_fallback_vfs
1807
1915
for source in sources:
1808
1916
result.update(source.keys())
1920
class _GCBuildDetails(object):
1921
"""A blob of data about the build details.
1923
This stores the minimal data, which then allows compatibility with the old
1924
api, without taking as much memory.
1927
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1928
'_delta_end', '_parents')
1931
compression_parent = None
1933
def __init__(self, parents, position_info):
1934
self._parents = parents
1935
(self._index, self._group_start, self._group_end, self._basis_end,
1936
self._delta_end) = position_info
1939
return '%s(%s, %s)' % (self.__class__.__name__,
1940
self.index_memo, self._parents)
1943
def index_memo(self):
1944
return (self._index, self._group_start, self._group_end,
1945
self._basis_end, self._delta_end)
1948
def record_details(self):
1949
return static_tuple.StaticTuple(self.method, None)
1951
def __getitem__(self, offset):
1952
"""Compatibility thunk to act like a tuple."""
1954
return self.index_memo
1956
return self.compression_parent # Always None
1958
return self._parents
1960
return self.record_details
1962
raise IndexError('offset out of range')
1812
1968
class _GCGraphIndex(object):
1813
1969
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1843
1999
# repeated over and over, this creates a surplus of ints
1844
2000
self._int_cache = {}
1845
2001
if track_external_parent_refs:
1846
self._key_dependencies = knit._KeyRefs(
2002
self._key_dependencies = _KeyRefs(
1847
2003
track_new_keys=track_new_keys)
1849
2005
self._key_dependencies = None
2033
2188
# each, or about 7MB. Note that it might be even more when you consider
2034
2189
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2190
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2191
# a LIFO when re-using PyInt slots, which might cause more
2037
2192
# fragmentation.
2038
2193
start = int(bits[0])
2039
2194
start = self._int_cache.setdefault(start, start)