17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
24
from ..lazy_import import lazy_import
25
lazy_import(globals(), """
30
30
graph as _mod_graph,
37
from bzrlib.btree_index import BTreeBuilder
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
from bzrlib.versionedfile import (
36
from breezy.bzr import (
42
from breezy.i18n import gettext
48
from .btree_index import BTreeBuilder
49
from ..lru_cache import LRUSizeCache
50
from ..sixish import (
55
from .versionedfile import (
42
58
AbsentContentFactory,
43
59
ChunkedContentFactory,
44
60
FulltextContentFactory,
61
VersionedFilesWithFallbacks,
48
64
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
65
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
54
68
# osutils.sha_string('')
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
69
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
57
71
def sort_gc_optimal(parent_map):
58
72
"""Sort and group the keys in parent_map into groupcompress order.
65
79
# groupcompress ordering is approximately reverse topological,
66
80
# properly grouped by file-id.
67
81
per_prefix_map = {}
68
for key, value in parent_map.iteritems():
69
if isinstance(key, str) or len(key) == 1:
82
for key, value in viewitems(parent_map):
83
if isinstance(key, bytes) or len(key) == 1:
79
93
for prefix in sorted(per_prefix_map):
80
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
94
present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
95
return present_keys
98
class DecompressCorruption(errors.BzrError):
100
_fmt = "Corruption while decompressing repository file%(orig_error)s"
102
def __init__(self, orig_error=None):
103
if orig_error is not None:
104
self.orig_error = ", %s" % (orig_error,)
107
errors.BzrError.__init__(self)
84
110
# The max zlib window size is 32kB, so if we set 'max_size' output of the
85
111
# decompressor to the requested bytes + 32kB, then we should guarantee
86
112
# num_bytes coming out.
95
121
# Group Compress Block v1 Zlib
96
GCB_HEADER = 'gcb1z\n'
122
GCB_HEADER = b'gcb1z\n'
97
123
# Group Compress Block v1 Lzma
98
GCB_LZ_HEADER = 'gcb1l\n'
124
GCB_LZ_HEADER = b'gcb1l\n'
99
125
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
101
127
def __init__(self):
102
128
# map by key? or just order in file?
103
129
self._compressor_name = None
104
self._z_content = None
130
self._z_content_chunks = None
105
131
self._z_content_decompressor = None
106
132
self._z_content_length = None
107
133
self._content_length = None
132
158
# Expand the content if required
133
159
if self._content is None:
134
160
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
161
self._content = b''.join(self._content_chunks)
136
162
self._content_chunks = None
137
163
if self._content is None:
138
if self._z_content is None:
164
# We join self._z_content_chunks here, because if we are
165
# decompressing, then it is *very* likely that we have a single
167
if self._z_content_chunks is None:
139
168
raise AssertionError('No content to decompress')
140
if self._z_content == '':
169
z_content = b''.join(self._z_content_chunks)
142
172
elif self._compressor_name == 'lzma':
143
173
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
175
self._content = pylzma.decompress(z_content)
145
176
elif self._compressor_name == 'zlib':
146
177
# Start a zlib decompressor
147
178
if num_bytes * 4 > self._content_length * 3:
148
179
# If we are requesting more that 3/4ths of the content,
149
180
# just extract the whole thing in a single pass
150
181
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
182
self._content = zlib.decompress(z_content)
153
184
self._z_content_decompressor = zlib.decompressobj()
154
185
# Seed the decompressor with the uncompressed bytes, so
155
186
# that the rest of the code is simplified
156
187
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
188
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
189
if not self._z_content_decompressor.unconsumed_tail:
159
190
self._z_content_decompressor = None
197
228
# At present, we have 2 integers for the compressed and uncompressed
198
229
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
199
230
# checking too far, cap the search to 14 bytes.
200
pos2 = bytes.index('\n', pos, pos + 14)
201
self._z_content_length = int(bytes[pos:pos2])
203
pos2 = bytes.index('\n', pos, pos + 14)
204
self._content_length = int(bytes[pos:pos2])
206
if len(bytes) != (pos + self._z_content_length):
231
pos2 = data.index(b'\n', pos, pos + 14)
232
self._z_content_length = int(data[pos:pos2])
234
pos2 = data.index(b'\n', pos, pos + 14)
235
self._content_length = int(data[pos:pos2])
237
if len(data) != (pos + self._z_content_length):
207
238
# XXX: Define some GCCorrupt error ?
208
239
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
240
(len(data), pos, self._z_content_length))
241
self._z_content_chunks = (data[pos:],)
244
def _z_content(self):
245
"""Return z_content_chunks as a simple string.
247
Meant only to be used by the test suite.
249
if self._z_content_chunks is not None:
250
return b''.join(self._z_content_chunks)
213
254
def from_bytes(cls, bytes):
233
274
:return: The bytes for the content
235
276
if start == end == 0:
237
278
self._ensure_content(end)
238
279
# The bytes are 'f' or 'd' for the type, then a variable-length
239
280
# base128 integer for the content size, then the actual content
240
281
# We know that the variable-length integer won't be longer than 5
241
282
# bytes (it takes 5 bytes to encode 2^32)
242
283
c = self._content[start]
244
285
type = 'fulltext'
247
288
raise ValueError('Unknown content control code: %s'
253
294
if end != content_start + content_len:
254
295
raise ValueError('end != len according to field header'
255
296
' %s != %s' % (end, content_start + content_len))
257
bytes = self._content[content_start:end]
259
bytes = apply_delta_to_source(self._content, content_start, end)
298
return self._content[content_start:end]
299
# Must be type delta as checked above
300
return apply_delta_to_source(self._content, content_start, end)
262
302
def set_chunked_content(self, content_chunks, length):
263
303
"""Set the content of this block to the given chunks."""
269
309
self._content_length = length
270
310
self._content_chunks = content_chunks
271
311
self._content = None
272
self._z_content = None
312
self._z_content_chunks = None
274
314
def set_content(self, content):
275
315
"""Set the content of this block."""
276
316
self._content_length = len(content)
277
317
self._content = content
278
self._z_content = None
280
def _create_z_content_using_lzma(self):
281
if self._content_chunks is not None:
282
self._content = ''.join(self._content_chunks)
283
self._content_chunks = None
284
if self._content is None:
285
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
289
def _create_z_content_from_chunks(self):
318
self._z_content_chunks = None
320
def _create_z_content_from_chunks(self, chunks):
290
321
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
322
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
323
# (measured peak is maybe 30MB over the above...)
324
compressed_chunks = list(map(compressor.compress, chunks))
292
325
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
326
# Ignore empty chunks
327
self._z_content_chunks = [c for c in compressed_chunks if c]
328
self._z_content_length = sum(map(len, self._z_content_chunks))
296
330
def _create_z_content(self):
297
if self._z_content is not None:
300
self._create_z_content_using_lzma()
331
if self._z_content_chunks is not None:
302
333
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
334
chunks = self._content_chunks
336
chunks = (self._content,)
337
self._create_z_content_from_chunks(chunks)
340
"""Create the byte stream as a series of 'chunks'"""
341
self._create_z_content()
342
header = self.GCB_HEADER
343
chunks = [b'%s%d\n%d\n'
344
% (header, self._z_content_length, self._content_length),
346
chunks.extend(self._z_content_chunks)
347
total_len = sum(map(len, chunks))
348
return total_len, chunks
308
350
def to_bytes(self):
309
351
"""Encode the information into a byte stream."""
310
self._create_z_content()
312
header = self.GCB_LZ_HEADER
314
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
319
return ''.join(chunks)
352
total_len, chunks = self.to_chunks()
353
return b''.join(chunks)
321
355
def _dump(self, include_text=False):
322
356
"""Take this block, and spit out a human-readable structure.
334
368
while pos < self._content_length:
335
369
kind = self._content[pos]
337
if kind not in ('f', 'd'):
371
if kind not in (b'f', b'd'):
338
372
raise ValueError('invalid kind character: %r' % (kind,))
339
373
content_len, len_len = decode_base128_int(
340
374
self._content[pos:pos + 5])
342
376
if content_len + pos > self._content_length:
343
377
raise ValueError('invalid content_len %d for record @ pos %d'
344
378
% (content_len, pos - len_len - 1))
345
if kind == 'f': # Fulltext
379
if kind == b'f': # Fulltext
347
381
text = self._content[pos:pos+content_len]
348
result.append(('f', content_len, text))
382
result.append((b'f', content_len, text))
350
result.append(('f', content_len))
351
elif kind == 'd': # Delta
384
result.append((b'f', content_len))
385
elif kind == b'd': # Delta
352
386
delta_content = self._content[pos:pos+content_len]
354
388
# The first entry in a delta is the decompressed length
355
389
decomp_len, delta_pos = decode_base128_int(delta_content)
356
result.append(('d', content_len, decomp_len, delta_info))
390
result.append((b'd', content_len, decomp_len, delta_info))
358
392
while delta_pos < content_len:
359
393
c = ord(delta_content[delta_pos])
366
400
text = self._content[offset:offset+length]
367
delta_info.append(('c', offset, length, text))
401
delta_info.append((b'c', offset, length, text))
369
delta_info.append(('c', offset, length))
403
delta_info.append((b'c', offset, length))
370
404
measured_len += length
373
407
txt = delta_content[delta_pos:delta_pos+c]
376
delta_info.append(('i', c, txt))
410
delta_info.append((b'i', c, txt))
377
411
measured_len += c
379
413
if delta_pos != content_len:
429
463
# wire bytes, something...
430
464
return self._manager._wire_bytes()
433
467
if storage_kind in ('fulltext', 'chunked'):
434
468
if self._bytes is None:
435
469
# Grab and cache the raw bytes for this entry
436
470
# and break the ref-cycle with _manager since we don't need it
438
self._manager._prepare_for_extract()
473
self._manager._prepare_for_extract()
474
except zlib.error as value:
475
raise DecompressCorruption("zlib: " + str(value))
439
476
block = self._manager._block
440
477
self._bytes = block.extract(self.key, self._start, self._end)
441
478
# There are code paths that first extract as fulltext, and then
460
497
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
498
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
500
def __init__(self, block, get_compressor_settings=None):
464
501
self._block = block
465
502
# We need to preserve the ordering
466
503
self._factories = []
467
504
self._last_byte = 0
505
self._get_settings = get_compressor_settings
506
self._compressor_settings = None
508
def _get_compressor_settings(self):
509
if self._compressor_settings is not None:
510
return self._compressor_settings
512
if self._get_settings is not None:
513
settings = self._get_settings()
515
vf = GroupCompressVersionedFiles
516
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
517
self._compressor_settings = settings
518
return self._compressor_settings
469
520
def add_factory(self, key, parents, start, end):
470
521
if not self._factories:
503
554
new_block.set_content(self._block._content[:last_byte])
504
555
self._block = new_block
557
def _make_group_compressor(self):
558
return GroupCompressor(self._get_compressor_settings())
506
560
def _rebuild_block(self):
507
561
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
562
compressor = self._make_group_compressor()
509
563
tstart = time.time()
510
564
old_length = self._block._content_length
523
577
# block? It seems hard to come up with a method that it would
524
578
# expand, since we do full compression again. Perhaps based on a
525
579
# request that ends up poorly ordered?
580
# TODO: If the content would have expanded, then we would want to
581
# handle a case where we need to split the block.
582
# Now that we have a user-tweakable option
583
# (max_bytes_to_index), it is possible that one person set it
584
# to a very low value, causing poor compression.
526
585
delta = time.time() - tstart
527
586
self._block = new_block
528
587
trace.mutter('creating new compressed block on-the-fly in %.3fs'
679
738
z_header_bytes = zlib.compress(header_bytes)
681
740
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
741
block_bytes_len, block_chunks = self._block.to_chunks()
683
742
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
744
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
745
lines.extend(block_chunks)
746
del z_header_bytes, block_chunks
747
# TODO: This is a point where we will double the memory consumption. To
748
# avoid this, we probably have to switch to a 'chunked' api
688
749
return ''.join(lines)
691
752
def from_bytes(cls, bytes):
692
753
# TODO: This does extra string copying, probably better to do it a
754
# different way. At a minimum this creates 2 copies of the
694
756
(storage_kind, z_header_len, header_len,
695
757
block_len, rest) = bytes.split('\n', 4)
792
858
if sha1 == nostore_sha:
793
859
raise errors.ExistingContent()
794
860
if key[-1] is None:
795
key = key[:-1] + ('sha1:' + sha1,)
861
# GZ 2017-06-10: Seems perverse to have to encode here.
862
sha1 = sha1.encode('ascii')
863
key = key[:-1] + (b'sha1:' + sha1,)
797
865
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
798
866
return sha1, start, end, type
825
893
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
826
894
delta_chunks = self.chunks[start_chunk:end_chunk]
827
895
stored_bytes = ''.join(delta_chunks)
828
if stored_bytes[0] == 'f':
896
if stored_bytes[0] == b'f':
829
897
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
830
898
data_len = fulltext_len + 1 + offset
831
899
if data_len != len(stored_bytes):
855
923
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
925
self._block.set_chunked_content(self.chunks, self.endpoint)
866
926
self.chunks = None
867
927
self._delta_index = None
886
946
class PythonGroupCompressor(_CommonGroupCompressor):
948
def __init__(self, settings=None):
889
949
"""Create a GroupCompressor.
891
951
Used only if the pyrex version is not available.
893
super(PythonGroupCompressor, self).__init__()
953
super(PythonGroupCompressor, self).__init__(settings)
894
954
self._delta_index = LinesDeltaIndex([])
895
955
# The actual content is managed by LinesDeltaIndex
896
956
self.chunks = self._delta_index.lines
905
965
if delta_length > max_delta_size:
906
966
# The delta is longer than the fulltext, insert a fulltext
907
967
type = 'fulltext'
908
out_lines = ['f', encode_base128_int(input_len)]
968
out_lines = [b'f', encode_base128_int(input_len)]
909
969
out_lines.extend(new_lines)
910
970
index_lines = [False, False]
911
971
index_lines.extend([True] * len(new_lines))
913
973
# this is a worthy delta, output it
916
976
# Update the delta_length to include those two encoded integers
917
977
out_lines[1] = encode_base128_int(delta_length)
918
978
# Before insertion
934
994
It contains code very similar to SequenceMatcher because of having a similar
935
995
task. However some key differences apply:
936
- there is no junk, we want a minimal edit not a human readable diff.
937
- we don't filter very common lines (because we don't know where a good
938
range will start, and after the first text we want to be emitting minmal
940
- we chain the left side, not the right side
941
- we incrementally update the adjacency matrix as new lines are provided.
942
- we look for matches in all of the left side, so the routine which does
943
the analagous task of find_longest_match does not need to filter on the
997
* there is no junk, we want a minimal edit not a human readable diff.
998
* we don't filter very common lines (because we don't know where a good
999
range will start, and after the first text we want to be emitting minmal
1001
* we chain the left side, not the right side
1002
* we incrementally update the adjacency matrix as new lines are provided.
1003
* we look for matches in all of the left side, so the routine which does
1004
the analagous task of find_longest_match does not need to filter on the
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1008
def __init__(self, settings=None):
1009
super(PyrexGroupCompressor, self).__init__(settings)
1010
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1011
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1013
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1014
"""see _CommonGroupCompressor._compress"""
970
1032
enc_length = encode_base128_int(len(bytes))
971
1033
len_mini_header = 1 + len(enc_length)
972
1034
self._delta_index.add_source(bytes, len_mini_header)
973
new_chunks = ['f', enc_length, bytes]
1035
new_chunks = [b'f', enc_length, bytes]
976
1038
enc_length = encode_base128_int(len(delta))
977
1039
len_mini_header = 1 + len(enc_length)
978
new_chunks = ['d', enc_length, delta]
1040
new_chunks = [b'd', enc_length, delta]
979
1041
self._delta_index.add_delta_source(delta, len_mini_header)
980
1042
# Before insertion
981
1043
start = self.endpoint
1027
1089
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1090
add_callback=graph_index.add_nodes,
1029
1091
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1092
access = pack_repo._DirectPackAccess({})
1031
1093
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1094
result = GroupCompressVersionedFiles(index, access, delta)
1033
1095
result.stream = stream
1044
1106
class _BatchingBlockFetcher(object):
1045
1107
"""Fetch group compress blocks in batches.
1047
1109
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1110
currently pending batch.
1051
def __init__(self, gcvf, locations):
1113
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1114
self.gcvf = gcvf
1053
1115
self.locations = locations
1136
1199
memos_to_get_stack.pop()
1138
1201
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1202
self.manager = _LazyGroupContentManager(block,
1203
get_compressor_settings=self._get_compressor_settings)
1140
1204
self.last_read_memo = read_memo
1141
1205
start, end = index_memo[3:5]
1142
1206
self.manager.add_factory(key, parents, start, end)
1149
1213
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1216
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1217
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1219
# This controls how the GroupCompress DeltaIndex works. Basically, we
1220
# compute hash pointers into the source blocks (so hash(text) => text).
1221
# However each of these references costs some memory in trade against a
1222
# more accurate match result. For very large files, they either are
1223
# pre-compressed and change in bulk whenever they change, or change in just
1224
# local blocks. Either way, 'improved resolution' is not very helpful,
1225
# versus running out of memory trying to track everything. The default max
1226
# gives 100% sampling of a 1MB file.
1227
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1228
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1229
_DEFAULT_MAX_BYTES_TO_INDEX}
1231
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1233
"""Create a GroupCompressVersionedFiles object.
1158
1235
:param index: The index object storing access and graph data.
1159
1236
:param access: The access object storing raw data.
1160
1237
:param delta: Whether to delta compress or just entropy compress.
1161
1238
:param _unadded_refs: private parameter, don't use.
1239
:param _group_cache: private parameter, don't use.
1163
1241
self._index = index
1164
1242
self._access = access
1166
1244
if _unadded_refs is None:
1167
1245
_unadded_refs = {}
1168
1246
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1247
if _group_cache is None:
1248
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1249
self._group_cache = _group_cache
1250
self._immediate_fallback_vfs = []
1251
self._max_bytes_to_index = None
1172
1253
def without_fallbacks(self):
1173
1254
"""Return a clone of this object without any fallbacks configured."""
1174
1255
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1256
self._delta, _unadded_refs=dict(self._unadded_refs),
1257
_group_cache=self._group_cache)
1177
1259
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1260
left_matching_blocks=None, nostore_sha=None, random_id=False,
1182
1264
:param key: The key tuple of the text to add.
1183
1265
:param parents: The parents key tuples of the text to add.
1184
1266
:param lines: A list of lines. Each line must be a bytestring. And all
1185
of them except the last must be terminated with \n and contain no
1186
other \n's. The last line may either contain no \n's or a single
1187
terminating \n. If the lines list does meet this constraint the add
1188
routine may error or may succeed - but you will be unable to read
1189
the data back accurately. (Checking the lines have been split
1267
of them except the last must be terminated with \\n and contain no
1268
other \\n's. The last line may either contain no \\n's or a single
1269
terminating \\n. If the lines list does meet this constraint the
1270
add routine may error or may succeed - but you will be unable to
1271
read the data back accurately. (Checking the lines have been split
1190
1272
correctly is expensive and extremely unlikely to catch bugs so it
1191
1273
is not done at runtime unless check_content is True.)
1192
1274
:param parent_texts: An optional dictionary containing the opaque
1287
1369
self._check_lines_not_unicode(lines)
1288
1370
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1306
1372
def get_parent_map(self, keys):
1307
1373
"""Get a map of the graph parents of keys.
1447
1513
The returned objects should be in the order defined by 'ordering',
1448
1514
which can weave between different sources.
1449
1516
:param ordering: Must be one of 'topological' or 'groupcompress'
1450
1517
:return: List of [(source, [keys])] tuples, such that all keys are in
1451
1518
the defined order, regardless of source.
1453
1520
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1521
present_keys = tsort.topo_sort(parent_map)
1456
1523
# ordering == 'groupcompress'
1457
1524
# XXX: This only optimizes for the target ordering. We may need
1493
1560
# This is the group the bytes are stored in, followed by the
1494
1561
# location in the group
1495
1562
return locations[key][0]
1496
present_keys = sorted(locations.iterkeys(), key=get_group)
1497
1563
# We don't have an ordering for keys in the in-memory object, but
1498
1564
# lets process the in-memory ones first.
1499
present_keys = list(unadded_keys) + present_keys
1565
present_keys = list(unadded_keys)
1566
present_keys.extend(sorted(locations, key=get_group))
1500
1567
# Now grab all of the ones from other sources
1501
1568
source_keys = [(self, present_keys)]
1502
1569
source_keys.extend(source_result)
1546
1613
# - we encounter an unadded ref, or
1547
1614
# - we run out of keys, or
1548
1615
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1616
batcher = _BatchingBlockFetcher(self, locations,
1617
get_compressor_settings=self._get_compressor_settings)
1550
1618
for source, keys in source_keys:
1551
1619
if source is self:
1552
1620
for key in keys:
1598
1666
for _ in self._insert_record_stream(stream, random_id=False):
1669
def _get_compressor_settings(self):
1670
if self._max_bytes_to_index is None:
1671
# TODO: VersionedFiles don't know about their containing
1672
# repository, so they don't have much of an idea about their
1673
# location. So for now, this is only a global option.
1674
c = config.GlobalConfig()
1675
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1679
except ValueError as e:
1680
trace.warning('Value for '
1681
'"bzr.groupcompress.max_bytes_to_index"'
1682
' %r is not an integer'
1686
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1687
self._max_bytes_to_index = val
1688
return {'max_bytes_to_index': self._max_bytes_to_index}
1690
def _make_group_compressor(self):
1691
return GroupCompressor(self._get_compressor_settings())
1601
1693
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1694
reuse_blocks=True):
1603
1695
"""Internal core to insert a record stream into this container.
1627
1719
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1721
self._compressor = self._make_group_compressor()
1630
1722
self._unadded_refs = {}
1631
1723
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1725
bytes_len, chunks = self._compressor.flush().to_chunks()
1726
self._compressor = self._make_group_compressor()
1727
# Note: At this point we still have 1 copy of the fulltext (in
1728
# record and the var 'bytes'), and this generates 2 copies of
1729
# the compressed text (one for bytes, one in chunks)
1730
# TODO: Push 'chunks' down into the _access api, so that we don't
1731
# have to double compressed memory here
1732
# TODO: Figure out how to indicate that we would be happy to free
1733
# the fulltext content at this point. Note that sometimes we
1734
# will want it later (streaming CHK pages), but most of the
1735
# time we won't (everything else)
1736
data = b''.join(chunks)
1635
1738
index, start, length = self._access.add_raw_records(
1636
[(None, len(bytes))], bytes)[0]
1739
[(None, len(data))], data)[0]
1638
1741
for key, reads, refs in keys_to_add:
1639
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1742
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1640
1743
self._index.add_records(nodes, random_id=random_id)
1641
1744
self._unadded_refs = {}
1642
1745
del keys_to_add[:]
1656
1759
raise errors.RevisionNotPresent(record.key, self)
1658
1761
if record.key in inserted_keys:
1659
trace.note('Insert claimed random_id=True,'
1660
' but then inserted %r two times', record.key)
1762
trace.note(gettext('Insert claimed random_id=True,'
1763
' but then inserted %r two times'), record.key)
1662
1765
inserted_keys.add(record.key)
1663
1766
if reuse_blocks:
1692
1795
' the current record, we cannot be positive'
1693
1796
' that the appropriate content was inserted.'
1695
value = "%d %d %d %d" % (block_start, block_length,
1798
value = b"%d %d %d %d" % (block_start, block_length,
1696
1799
record._start, record._end)
1697
1800
nodes = [(record.key, value, (record.parents,))]
1698
1801
# TODO: Consider buffering up many nodes to be added, not
1802
1906
"""See VersionedFiles.keys."""
1803
1907
if 'evil' in debug.debug_flags:
1804
1908
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1909
sources = [self._index] + self._immediate_fallback_vfs
1807
1911
for source in sources:
1808
1912
result.update(source.keys())
1916
class _GCBuildDetails(object):
1917
"""A blob of data about the build details.
1919
This stores the minimal data, which then allows compatibility with the old
1920
api, without taking as much memory.
1923
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1924
'_delta_end', '_parents')
1927
compression_parent = None
1929
def __init__(self, parents, position_info):
1930
self._parents = parents
1931
(self._index, self._group_start, self._group_end, self._basis_end,
1932
self._delta_end) = position_info
1935
return '%s(%s, %s)' % (self.__class__.__name__,
1936
self.index_memo, self._parents)
1939
def index_memo(self):
1940
return (self._index, self._group_start, self._group_end,
1941
self._basis_end, self._delta_end)
1944
def record_details(self):
1945
return static_tuple.StaticTuple(self.method, None)
1947
def __getitem__(self, offset):
1948
"""Compatibility thunk to act like a tuple."""
1950
return self.index_memo
1952
return self.compression_parent # Always None
1954
return self._parents
1956
return self.record_details
1958
raise IndexError('offset out of range')
1812
1964
class _GCGraphIndex(object):
1813
1965
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1817
1969
inconsistency_fatal=True, track_new_keys=False):
1818
1970
"""Construct a _GCGraphIndex on a graph_index.
1820
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1972
:param graph_index: An implementation of breezy.index.GraphIndex.
1821
1973
:param is_locked: A callback, returns True if the index is locked and
1823
1975
:param parents: If True, record knits parents, if not do not record
1902
2054
if self._parents:
1903
for key, (value, node_refs) in keys.iteritems():
2055
for key, (value, node_refs) in viewitems(keys):
1904
2056
result.append((key, value, node_refs))
1906
for key, (value, node_refs) in keys.iteritems():
2058
for key, (value, node_refs) in viewitems(keys):
1907
2059
result.append((key, value))
1908
2060
records = result
1909
2061
key_dependencies = self._key_dependencies
1989
2141
:param keys: An iterable of keys.
1990
2142
:return: A dict of key:
1991
2143
(index_memo, compression_parent, parents, record_details).
1993
opaque structure to pass to read_records to extract the raw
1996
Content that this record is built upon, may be None
1998
Logical parents of this node
2000
extra information about the content which needs to be passed to
2001
Factory.parse_record
2145
* index_memo: opaque structure to pass to read_records to extract
2147
* compression_parent: Content that this record is built upon, may
2149
* parents: Logical parents of this node
2150
* record_details: extra information about the content which needs
2151
to be passed to Factory.parse_record
2003
2153
self._check_read()
2033
2182
# each, or about 7MB. Note that it might be even more when you consider
2034
2183
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2184
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2185
# a LIFO when re-using PyInt slots, which might cause more
2037
2186
# fragmentation.
2038
2187
start = int(bits[0])
2039
2188
start = self._int_cache.setdefault(start, start)