23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
33
30
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
44
37
from bzrlib.btree_index import BTreeBuilder
45
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
46
40
from bzrlib.versionedfile import (
49
42
AbsentContentFactory,
50
43
ChunkedContentFactory,
51
44
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
55
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
142
135
self._content = ''.join(self._content_chunks)
143
136
self._content_chunks = None
144
137
if self._content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
138
if self._z_content is None:
149
139
raise AssertionError('No content to decompress')
150
z_content = ''.join(self._z_content_chunks)
140
if self._z_content == '':
152
141
self._content = ''
153
142
elif self._compressor_name == 'lzma':
154
143
# We don't do partial lzma decomp yet
155
self._content = pylzma.decompress(z_content)
144
self._content = pylzma.decompress(self._z_content)
156
145
elif self._compressor_name == 'zlib':
157
146
# Start a zlib decompressor
158
147
if num_bytes * 4 > self._content_length * 3:
159
148
# If we are requesting more that 3/4ths of the content,
160
149
# just extract the whole thing in a single pass
161
150
num_bytes = self._content_length
162
self._content = zlib.decompress(z_content)
151
self._content = zlib.decompress(self._z_content)
164
153
self._z_content_decompressor = zlib.decompressobj()
165
154
# Seed the decompressor with the uncompressed bytes, so
166
155
# that the rest of the code is simplified
167
156
self._content = self._z_content_decompressor.decompress(
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
158
if not self._z_content_decompressor.unconsumed_tail:
170
159
self._z_content_decompressor = None
218
207
# XXX: Define some GCCorrupt error ?
219
208
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
220
209
(len(bytes), pos, self._z_content_length))
221
self._z_content_chunks = (bytes[pos:],)
224
def _z_content(self):
225
"""Return z_content_chunks as a simple string.
227
Meant only to be used by the test suite.
229
if self._z_content_chunks is not None:
230
return ''.join(self._z_content_chunks)
210
self._z_content = bytes[pos:]
234
213
def from_bytes(cls, bytes):
290
269
self._content_length = length
291
270
self._content_chunks = content_chunks
292
271
self._content = None
293
self._z_content_chunks = None
272
self._z_content = None
295
274
def set_content(self, content):
296
275
"""Set the content of this block."""
297
276
self._content_length = len(content)
298
277
self._content = content
299
self._z_content_chunks = None
278
self._z_content = None
301
280
def _create_z_content_using_lzma(self):
302
281
if self._content_chunks is not None:
304
283
self._content_chunks = None
305
284
if self._content is None:
306
285
raise AssertionError('Nothing to compress')
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
311
def _create_z_content_from_chunks(self, chunks):
289
def _create_z_content_from_chunks(self):
312
290
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
316
292
compressed_chunks.append(compressor.flush())
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
321
296
def _create_z_content(self):
322
if self._z_content_chunks is not None:
297
if self._z_content is not None:
325
300
self._create_z_content_using_lzma()
327
302
if self._content_chunks is not None:
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
334
"""Create the byte stream as a series of 'chunks'"""
309
"""Encode the information into a byte stream."""
335
310
self._create_z_content()
337
312
header = self.GCB_LZ_HEADER
339
314
header = self.GCB_HEADER
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
350
319
return ''.join(chunks)
352
321
def _dump(self, include_text=False):
491
460
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
492
461
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
494
def __init__(self, block, get_compressor_settings=None):
463
def __init__(self, block):
495
464
self._block = block
496
465
# We need to preserve the ordering
497
466
self._factories = []
498
467
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
514
469
def add_factory(self, key, parents, start, end):
515
470
if not self._factories:
548
503
new_block.set_content(self._block._content[:last_byte])
549
504
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
554
506
def _rebuild_block(self):
555
507
"""Create a new GroupCompressBlock with only the referenced texts."""
556
compressor = self._make_group_compressor()
508
compressor = GroupCompressor()
557
509
tstart = time.time()
558
510
old_length = self._block._content_length
571
523
# block? It seems hard to come up with a method that it would
572
524
# expand, since we do full compression again. Perhaps based on a
573
525
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
579
526
delta = time.time() - tstart
580
527
self._block = new_block
581
528
trace.mutter('creating new compressed block on-the-fly in %.3fs'
732
679
z_header_bytes = zlib.compress(header_bytes)
734
681
z_header_bytes_len = len(z_header_bytes)
735
block_bytes_len, block_chunks = self._block.to_chunks()
682
block_bytes = self._block.to_bytes()
736
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
738
685
lines.append(z_header_bytes)
739
lines.extend(block_chunks)
740
del z_header_bytes, block_chunks
741
# TODO: This is a point where we will double the memory consumption. To
742
# avoid this, we probably have to switch to a 'chunked' api
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
743
688
return ''.join(lines)
746
691
def from_bytes(cls, bytes):
747
692
# TODO: This does extra string copying, probably better to do it a
748
# different way. At a minimum this creates 2 copies of the
750
694
(storage_kind, z_header_len, header_len,
751
695
block_len, rest) = bytes.split('\n', 4)
813
757
self.labels_deltas = {}
814
758
self._delta_index = None # Set by the children
815
759
self._block = GroupCompressBlock()
819
self._settings = settings
821
761
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
822
762
"""Compress lines with label key.
915
855
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
917
865
self._block.set_chunked_content(self.chunks, self.endpoint)
918
866
self.chunks = None
919
867
self._delta_index = None
938
886
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
941
889
"""Create a GroupCompressor.
943
891
Used only if the pyrex version is not available.
945
super(PythonGroupCompressor, self).__init__(settings)
893
super(PythonGroupCompressor, self).__init__()
946
894
self._delta_index = LinesDeltaIndex([])
947
895
# The actual content is managed by LinesDeltaIndex
948
896
self.chunks = self._delta_index.lines
999
def __init__(self, settings=None):
1000
super(PyrexGroupCompressor, self).__init__(settings)
1001
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1002
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1004
951
def _compress(self, key, bytes, max_delta_size, soft=False):
1005
952
"""see _CommonGroupCompressor._compress"""
1080
1027
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1081
1028
add_callback=graph_index.add_nodes,
1082
1029
inconsistency_fatal=inconsistency_fatal)
1083
access = pack_repo._DirectPackAccess({})
1030
access = knit._DirectPackAccess({})
1084
1031
access.set_writer(writer, graph_index, (transport, 'newpack'))
1085
1032
result = GroupCompressVersionedFiles(index, access, delta)
1086
1033
result.stream = stream
1097
1044
class _BatchingBlockFetcher(object):
1098
1045
"""Fetch group compress blocks in batches.
1100
1047
:ivar total_bytes: int of expected number of bytes needed to fetch the
1101
1048
currently pending batch.
1104
def __init__(self, gcvf, locations, get_compressor_settings=None):
1051
def __init__(self, gcvf, locations):
1105
1052
self.gcvf = gcvf
1106
1053
self.locations = locations
1156
1102
def yield_factories(self, full_flush=False):
1157
1103
"""Yield factories for keys added since the last yield. They will be
1158
1104
returned in the order they were added via add_key.
1160
1106
:param full_flush: by default, some results may not be returned in case
1161
1107
they can be part of the next batch. If full_flush is True, then
1162
1108
all results are returned.
1190
1136
memos_to_get_stack.pop()
1192
1138
block = self.batch_memos[read_memo]
1193
self.manager = _LazyGroupContentManager(block,
1194
get_compressor_settings=self._get_compressor_settings)
1139
self.manager = _LazyGroupContentManager(block)
1195
1140
self.last_read_memo = read_memo
1196
1141
start, end = index_memo[3:5]
1197
1142
self.manager.add_factory(key, parents, start, end)
1204
1149
self.total_bytes = 0
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1152
class GroupCompressVersionedFiles(VersionedFiles):
1208
1153
"""A group-compress based VersionedFiles implementation."""
1210
# This controls how the GroupCompress DeltaIndex works. Basically, we
1211
# compute hash pointers into the source blocks (so hash(text) => text).
1212
# However each of these references costs some memory in trade against a
1213
# more accurate match result. For very large files, they either are
1214
# pre-compressed and change in bulk whenever they change, or change in just
1215
# local blocks. Either way, 'improved resolution' is not very helpful,
1216
# versus running out of memory trying to track everything. The default max
1217
# gives 100% sampling of a 1MB file.
1218
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1219
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1220
_DEFAULT_MAX_BYTES_TO_INDEX}
1222
def __init__(self, index, access, delta=True, _unadded_refs=None,
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1224
1156
"""Create a GroupCompressVersionedFiles object.
1226
1158
:param index: The index object storing access and graph data.
1227
1159
:param access: The access object storing raw data.
1228
1160
:param delta: Whether to delta compress or just entropy compress.
1229
1161
:param _unadded_refs: private parameter, don't use.
1230
:param _group_cache: private parameter, don't use.
1232
1163
self._index = index
1233
1164
self._access = access
1235
1166
if _unadded_refs is None:
1236
1167
_unadded_refs = {}
1237
1168
self._unadded_refs = _unadded_refs
1238
if _group_cache is None:
1239
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1240
self._group_cache = _group_cache
1241
self._immediate_fallback_vfs = []
1242
self._max_bytes_to_index = None
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1244
1172
def without_fallbacks(self):
1245
1173
"""Return a clone of this object without any fallbacks configured."""
1246
1174
return GroupCompressVersionedFiles(self._index, self._access,
1247
self._delta, _unadded_refs=dict(self._unadded_refs),
1248
_group_cache=self._group_cache)
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1250
1177
def add_lines(self, key, parents, lines, parent_texts=None,
1251
1178
left_matching_blocks=None, nostore_sha=None, random_id=False,
1360
1287
self._check_lines_not_unicode(lines)
1361
1288
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1363
1306
def get_parent_map(self, keys):
1364
1307
"""Get a map of the graph parents of keys.
1508
1451
the defined order, regardless of source.
1510
1453
if ordering == 'topological':
1511
present_keys = tsort.topo_sort(parent_map)
1454
present_keys = topo_sort(parent_map)
1513
1456
# ordering == 'groupcompress'
1514
1457
# XXX: This only optimizes for the target ordering. We may need
1603
1546
# - we encounter an unadded ref, or
1604
1547
# - we run out of keys, or
1605
1548
# - the total bytes to retrieve for this batch > BATCH_SIZE
1606
batcher = _BatchingBlockFetcher(self, locations,
1607
get_compressor_settings=self._get_compressor_settings)
1549
batcher = _BatchingBlockFetcher(self, locations)
1608
1550
for source, keys in source_keys:
1609
1551
if source is self:
1610
1552
for key in keys:
1656
1598
for _ in self._insert_record_stream(stream, random_id=False):
1659
def _get_compressor_settings(self):
1660
if self._max_bytes_to_index is None:
1661
# TODO: VersionedFiles don't know about their containing
1662
# repository, so they don't have much of an idea about their
1663
# location. So for now, this is only a global option.
1664
c = config.GlobalConfig()
1665
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1669
except ValueError, e:
1670
trace.warning('Value for '
1671
'"bzr.groupcompress.max_bytes_to_index"'
1672
' %r is not an integer'
1676
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1677
self._max_bytes_to_index = val
1678
return {'max_bytes_to_index': self._max_bytes_to_index}
1680
def _make_group_compressor(self):
1681
return GroupCompressor(self._get_compressor_settings())
1683
1601
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1684
1602
reuse_blocks=True):
1685
1603
"""Internal core to insert a record stream into this container.
1709
1627
# This will go up to fulltexts for gc to gc fetching, which isn't
1711
self._compressor = self._make_group_compressor()
1629
self._compressor = GroupCompressor()
1712
1630
self._unadded_refs = {}
1713
1631
keys_to_add = []
1715
bytes_len, chunks = self._compressor.flush().to_chunks()
1716
self._compressor = self._make_group_compressor()
1717
# Note: At this point we still have 1 copy of the fulltext (in
1718
# record and the var 'bytes'), and this generates 2 copies of
1719
# the compressed text (one for bytes, one in chunks)
1720
# TODO: Push 'chunks' down into the _access api, so that we don't
1721
# have to double compressed memory here
1722
# TODO: Figure out how to indicate that we would be happy to free
1723
# the fulltext content at this point. Note that sometimes we
1724
# will want it later (streaming CHK pages), but most of the
1725
# time we won't (everything else)
1726
bytes = ''.join(chunks)
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1728
1635
index, start, length = self._access.add_raw_records(
1729
1636
[(None, len(bytes))], bytes)[0]
1895
1802
"""See VersionedFiles.keys."""
1896
1803
if 'evil' in debug.debug_flags:
1897
1804
trace.mutter_callsite(2, "keys scales with size of history")
1898
sources = [self._index] + self._immediate_fallback_vfs
1805
sources = [self._index] + self._fallback_vfs
1900
1807
for source in sources:
1901
1808
result.update(source.keys())
1905
class _GCBuildDetails(object):
1906
"""A blob of data about the build details.
1908
This stores the minimal data, which then allows compatibility with the old
1909
api, without taking as much memory.
1912
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1913
'_delta_end', '_parents')
1916
compression_parent = None
1918
def __init__(self, parents, position_info):
1919
self._parents = parents
1920
(self._index, self._group_start, self._group_end, self._basis_end,
1921
self._delta_end) = position_info
1924
return '%s(%s, %s)' % (self.__class__.__name__,
1925
self.index_memo, self._parents)
1928
def index_memo(self):
1929
return (self._index, self._group_start, self._group_end,
1930
self._basis_end, self._delta_end)
1933
def record_details(self):
1934
return static_tuple.StaticTuple(self.method, None)
1936
def __getitem__(self, offset):
1937
"""Compatibility thunk to act like a tuple."""
1939
return self.index_memo
1941
return self.compression_parent # Always None
1943
return self._parents
1945
return self.record_details
1947
raise IndexError('offset out of range')
1953
1812
class _GCGraphIndex(object):
1954
1813
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1984
1843
# repeated over and over, this creates a surplus of ints
1985
1844
self._int_cache = {}
1986
1845
if track_external_parent_refs:
1987
self._key_dependencies = _KeyRefs(
1846
self._key_dependencies = knit._KeyRefs(
1988
1847
track_new_keys=track_new_keys)
1990
1849
self._key_dependencies = None
2173
2033
# each, or about 7MB. Note that it might be even more when you consider
2174
2034
# how PyInt is allocated in separate slabs. And you can't return a slab
2175
2035
# to the OS if even 1 int on it is in use. Note though that Python uses
2176
# a LIFO when re-using PyInt slots, which might cause more
2036
# a LIFO when re-using PyInt slots, which probably causes more
2177
2037
# fragmentation.
2178
2038
start = int(bits[0])
2179
2039
start = self._int_cache.setdefault(start, start)