23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
33
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
42
49
AbsentContentFactory,
43
50
ChunkedContentFactory,
44
51
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
135
142
self._content = ''.join(self._content_chunks)
136
143
self._content_chunks = None
137
144
if self._content is None:
138
if self._z_content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
139
149
raise AssertionError('No content to decompress')
140
if self._z_content == '':
150
z_content = ''.join(self._z_content_chunks)
141
152
self._content = ''
142
153
elif self._compressor_name == 'lzma':
143
154
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
155
self._content = pylzma.decompress(z_content)
145
156
elif self._compressor_name == 'zlib':
146
157
# Start a zlib decompressor
147
158
if num_bytes * 4 > self._content_length * 3:
148
159
# If we are requesting more that 3/4ths of the content,
149
160
# just extract the whole thing in a single pass
150
161
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
162
self._content = zlib.decompress(z_content)
153
164
self._z_content_decompressor = zlib.decompressobj()
154
165
# Seed the decompressor with the uncompressed bytes, so
155
166
# that the rest of the code is simplified
156
167
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
169
if not self._z_content_decompressor.unconsumed_tail:
159
170
self._z_content_decompressor = None
207
218
# XXX: Define some GCCorrupt error ?
208
219
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
220
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
221
self._z_content_chunks = (bytes[pos:],)
224
def _z_content(self):
225
"""Return z_content_chunks as a simple string.
227
Meant only to be used by the test suite.
229
if self._z_content_chunks is not None:
230
return ''.join(self._z_content_chunks)
213
234
def from_bytes(cls, bytes):
269
290
self._content_length = length
270
291
self._content_chunks = content_chunks
271
292
self._content = None
272
self._z_content = None
293
self._z_content_chunks = None
274
295
def set_content(self, content):
275
296
"""Set the content of this block."""
276
297
self._content_length = len(content)
277
298
self._content = content
278
self._z_content = None
299
self._z_content_chunks = None
280
301
def _create_z_content_using_lzma(self):
281
302
if self._content_chunks is not None:
283
304
self._content_chunks = None
284
305
if self._content is None:
285
306
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
311
def _create_z_content_from_chunks(self, chunks):
290
312
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
292
316
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
296
321
def _create_z_content(self):
297
if self._z_content is not None:
322
if self._z_content_chunks is not None:
300
325
self._create_z_content_using_lzma()
302
327
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
334
"""Create the byte stream as a series of 'chunks'"""
310
335
self._create_z_content()
312
337
header = self.GCB_LZ_HEADER
314
339
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
319
350
return ''.join(chunks)
321
352
def _dump(self, include_text=False):
460
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
464
495
self._block = block
465
496
# We need to preserve the ordering
466
497
self._factories = []
467
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
469
514
def add_factory(self, key, parents, start, end):
470
515
if not self._factories:
503
548
new_block.set_content(self._block._content[:last_byte])
504
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
506
554
def _rebuild_block(self):
507
555
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
509
557
tstart = time.time()
510
558
old_length = self._block._content_length
523
571
# block? It seems hard to come up with a method that it would
524
572
# expand, since we do full compression again. Perhaps based on a
525
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
526
579
delta = time.time() - tstart
527
580
self._block = new_block
528
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
679
732
z_header_bytes = zlib.compress(header_bytes)
681
734
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
735
block_bytes_len, block_chunks = self._block.to_chunks()
683
736
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
738
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
739
lines.extend(block_chunks)
740
del z_header_bytes, block_chunks
741
# TODO: This is a point where we will double the memory consumption. To
742
# avoid this, we probably have to switch to a 'chunked' api
688
743
return ''.join(lines)
691
746
def from_bytes(cls, bytes):
692
747
# TODO: This does extra string copying, probably better to do it a
748
# different way. At a minimum this creates 2 copies of the
694
750
(storage_kind, z_header_len, header_len,
695
751
block_len, rest) = bytes.split('\n', 4)
757
813
self.labels_deltas = {}
758
814
self._delta_index = None # Set by the children
759
815
self._block = GroupCompressBlock()
819
self._settings = settings
761
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
762
822
"""Compress lines with label key.
855
915
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
917
self._block.set_chunked_content(self.chunks, self.endpoint)
866
918
self.chunks = None
867
919
self._delta_index = None
886
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
889
941
"""Create a GroupCompressor.
891
943
Used only if the pyrex version is not available.
893
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
894
946
self._delta_index = LinesDeltaIndex([])
895
947
# The actual content is managed by LinesDeltaIndex
896
948
self.chunks = self._delta_index.lines
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
999
def __init__(self, settings=None):
1000
super(PyrexGroupCompressor, self).__init__(settings)
1001
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1002
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1004
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1005
"""see _CommonGroupCompressor._compress"""
1027
1080
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1081
add_callback=graph_index.add_nodes,
1029
1082
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1083
access = pack_repo._DirectPackAccess({})
1031
1084
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1085
result = GroupCompressVersionedFiles(index, access, delta)
1033
1086
result.stream = stream
1044
1097
class _BatchingBlockFetcher(object):
1045
1098
"""Fetch group compress blocks in batches.
1047
1100
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1101
currently pending batch.
1051
def __init__(self, gcvf, locations):
1104
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1105
self.gcvf = gcvf
1053
1106
self.locations = locations
1102
1156
def yield_factories(self, full_flush=False):
1103
1157
"""Yield factories for keys added since the last yield. They will be
1104
1158
returned in the order they were added via add_key.
1106
1160
:param full_flush: by default, some results may not be returned in case
1107
1161
they can be part of the next batch. If full_flush is True, then
1108
1162
all results are returned.
1136
1190
memos_to_get_stack.pop()
1138
1192
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1193
self.manager = _LazyGroupContentManager(block,
1194
get_compressor_settings=self._get_compressor_settings)
1140
1195
self.last_read_memo = read_memo
1141
1196
start, end = index_memo[3:5]
1142
1197
self.manager.add_factory(key, parents, start, end)
1149
1204
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1208
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1210
# This controls how the GroupCompress DeltaIndex works. Basically, we
1211
# compute hash pointers into the source blocks (so hash(text) => text).
1212
# However each of these references costs some memory in trade against a
1213
# more accurate match result. For very large files, they either are
1214
# pre-compressed and change in bulk whenever they change, or change in just
1215
# local blocks. Either way, 'improved resolution' is not very helpful,
1216
# versus running out of memory trying to track everything. The default max
1217
# gives 100% sampling of a 1MB file.
1218
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1219
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1220
_DEFAULT_MAX_BYTES_TO_INDEX}
1222
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1224
"""Create a GroupCompressVersionedFiles object.
1158
1226
:param index: The index object storing access and graph data.
1159
1227
:param access: The access object storing raw data.
1160
1228
:param delta: Whether to delta compress or just entropy compress.
1161
1229
:param _unadded_refs: private parameter, don't use.
1230
:param _group_cache: private parameter, don't use.
1163
1232
self._index = index
1164
1233
self._access = access
1166
1235
if _unadded_refs is None:
1167
1236
_unadded_refs = {}
1168
1237
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1238
if _group_cache is None:
1239
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1240
self._group_cache = _group_cache
1241
self._immediate_fallback_vfs = []
1242
self._max_bytes_to_index = None
1172
1244
def without_fallbacks(self):
1173
1245
"""Return a clone of this object without any fallbacks configured."""
1174
1246
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1247
self._delta, _unadded_refs=dict(self._unadded_refs),
1248
_group_cache=self._group_cache)
1177
1250
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1251
left_matching_blocks=None, nostore_sha=None, random_id=False,
1287
1360
self._check_lines_not_unicode(lines)
1288
1361
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1306
1363
def get_parent_map(self, keys):
1307
1364
"""Get a map of the graph parents of keys.
1451
1508
the defined order, regardless of source.
1453
1510
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1511
present_keys = tsort.topo_sort(parent_map)
1456
1513
# ordering == 'groupcompress'
1457
1514
# XXX: This only optimizes for the target ordering. We may need
1546
1603
# - we encounter an unadded ref, or
1547
1604
# - we run out of keys, or
1548
1605
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1606
batcher = _BatchingBlockFetcher(self, locations,
1607
get_compressor_settings=self._get_compressor_settings)
1550
1608
for source, keys in source_keys:
1551
1609
if source is self:
1552
1610
for key in keys:
1598
1656
for _ in self._insert_record_stream(stream, random_id=False):
1659
def _get_compressor_settings(self):
1660
if self._max_bytes_to_index is None:
1661
# TODO: VersionedFiles don't know about their containing
1662
# repository, so they don't have much of an idea about their
1663
# location. So for now, this is only a global option.
1664
c = config.GlobalConfig()
1665
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1669
except ValueError, e:
1670
trace.warning('Value for '
1671
'"bzr.groupcompress.max_bytes_to_index"'
1672
' %r is not an integer'
1676
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1677
self._max_bytes_to_index = val
1678
return {'max_bytes_to_index': self._max_bytes_to_index}
1680
def _make_group_compressor(self):
1681
return GroupCompressor(self._get_compressor_settings())
1601
1683
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1684
reuse_blocks=True):
1603
1685
"""Internal core to insert a record stream into this container.
1627
1709
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1711
self._compressor = self._make_group_compressor()
1630
1712
self._unadded_refs = {}
1631
1713
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1715
bytes_len, chunks = self._compressor.flush().to_chunks()
1716
self._compressor = self._make_group_compressor()
1717
# Note: At this point we still have 1 copy of the fulltext (in
1718
# record and the var 'bytes'), and this generates 2 copies of
1719
# the compressed text (one for bytes, one in chunks)
1720
# TODO: Push 'chunks' down into the _access api, so that we don't
1721
# have to double compressed memory here
1722
# TODO: Figure out how to indicate that we would be happy to free
1723
# the fulltext content at this point. Note that sometimes we
1724
# will want it later (streaming CHK pages), but most of the
1725
# time we won't (everything else)
1726
bytes = ''.join(chunks)
1635
1728
index, start, length = self._access.add_raw_records(
1636
1729
[(None, len(bytes))], bytes)[0]
1802
1895
"""See VersionedFiles.keys."""
1803
1896
if 'evil' in debug.debug_flags:
1804
1897
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1898
sources = [self._index] + self._immediate_fallback_vfs
1807
1900
for source in sources:
1808
1901
result.update(source.keys())
1905
class _GCBuildDetails(object):
1906
"""A blob of data about the build details.
1908
This stores the minimal data, which then allows compatibility with the old
1909
api, without taking as much memory.
1912
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1913
'_delta_end', '_parents')
1916
compression_parent = None
1918
def __init__(self, parents, position_info):
1919
self._parents = parents
1920
(self._index, self._group_start, self._group_end, self._basis_end,
1921
self._delta_end) = position_info
1924
return '%s(%s, %s)' % (self.__class__.__name__,
1925
self.index_memo, self._parents)
1928
def index_memo(self):
1929
return (self._index, self._group_start, self._group_end,
1930
self._basis_end, self._delta_end)
1933
def record_details(self):
1934
return static_tuple.StaticTuple(self.method, None)
1936
def __getitem__(self, offset):
1937
"""Compatibility thunk to act like a tuple."""
1939
return self.index_memo
1941
return self.compression_parent # Always None
1943
return self._parents
1945
return self.record_details
1947
raise IndexError('offset out of range')
1812
1953
class _GCGraphIndex(object):
1813
1954
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1843
1984
# repeated over and over, this creates a surplus of ints
1844
1985
self._int_cache = {}
1845
1986
if track_external_parent_refs:
1846
self._key_dependencies = knit._KeyRefs(
1987
self._key_dependencies = _KeyRefs(
1847
1988
track_new_keys=track_new_keys)
1849
1990
self._key_dependencies = None
2033
2173
# each, or about 7MB. Note that it might be even more when you consider
2034
2174
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2175
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2176
# a LIFO when re-using PyInt slots, which might cause more
2037
2177
# fragmentation.
2038
2178
start = int(bits[0])
2039
2179
start = self._int_cache.setdefault(start, start)