17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
24
from ..lazy_import import lazy_import
25
lazy_import(globals(), """
30
30
graph as _mod_graph,
37
from bzrlib.btree_index import BTreeBuilder
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
from bzrlib.versionedfile import (
36
from breezy.bzr import (
42
from breezy.i18n import gettext
48
from .btree_index import BTreeBuilder
49
from ..lru_cache import LRUSizeCache
50
from ..sixish import (
56
from .versionedfile import (
42
59
AbsentContentFactory,
43
60
ChunkedContentFactory,
44
61
FulltextContentFactory,
62
VersionedFilesWithFallbacks,
48
65
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
66
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
54
69
# osutils.sha_string('')
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
70
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
57
72
def sort_gc_optimal(parent_map):
58
73
"""Sort and group the keys in parent_map into groupcompress order.
65
80
# groupcompress ordering is approximately reverse topological,
66
81
# properly grouped by file-id.
67
82
per_prefix_map = {}
68
for key, value in parent_map.iteritems():
69
if isinstance(key, str) or len(key) == 1:
83
for key, value in viewitems(parent_map):
84
if isinstance(key, bytes) or len(key) == 1:
79
94
for prefix in sorted(per_prefix_map):
80
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
95
present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
96
return present_keys
99
class DecompressCorruption(errors.BzrError):
101
_fmt = "Corruption while decompressing repository file%(orig_error)s"
103
def __init__(self, orig_error=None):
104
if orig_error is not None:
105
self.orig_error = ", %s" % (orig_error,)
108
errors.BzrError.__init__(self)
84
111
# The max zlib window size is 32kB, so if we set 'max_size' output of the
85
112
# decompressor to the requested bytes + 32kB, then we should guarantee
86
113
# num_bytes coming out.
95
122
# Group Compress Block v1 Zlib
96
GCB_HEADER = 'gcb1z\n'
123
GCB_HEADER = b'gcb1z\n'
97
124
# Group Compress Block v1 Lzma
98
GCB_LZ_HEADER = 'gcb1l\n'
125
GCB_LZ_HEADER = b'gcb1l\n'
99
126
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
101
128
def __init__(self):
102
129
# map by key? or just order in file?
103
130
self._compressor_name = None
104
self._z_content = None
131
self._z_content_chunks = None
105
132
self._z_content_decompressor = None
106
133
self._z_content_length = None
107
134
self._content_length = None
132
159
# Expand the content if required
133
160
if self._content is None:
134
161
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
162
self._content = b''.join(self._content_chunks)
136
163
self._content_chunks = None
137
164
if self._content is None:
138
if self._z_content is None:
165
# We join self._z_content_chunks here, because if we are
166
# decompressing, then it is *very* likely that we have a single
168
if self._z_content_chunks is None:
139
169
raise AssertionError('No content to decompress')
140
if self._z_content == '':
170
z_content = b''.join(self._z_content_chunks)
142
173
elif self._compressor_name == 'lzma':
143
174
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
176
self._content = pylzma.decompress(z_content)
145
177
elif self._compressor_name == 'zlib':
146
178
# Start a zlib decompressor
147
179
if num_bytes * 4 > self._content_length * 3:
148
180
# If we are requesting more that 3/4ths of the content,
149
181
# just extract the whole thing in a single pass
150
182
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
183
self._content = zlib.decompress(z_content)
153
185
self._z_content_decompressor = zlib.decompressobj()
154
186
# Seed the decompressor with the uncompressed bytes, so
155
187
# that the rest of the code is simplified
156
188
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
189
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
190
if not self._z_content_decompressor.unconsumed_tail:
159
191
self._z_content_decompressor = None
197
229
# At present, we have 2 integers for the compressed and uncompressed
198
230
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
199
231
# checking too far, cap the search to 14 bytes.
200
pos2 = bytes.index('\n', pos, pos + 14)
201
self._z_content_length = int(bytes[pos:pos2])
203
pos2 = bytes.index('\n', pos, pos + 14)
204
self._content_length = int(bytes[pos:pos2])
206
if len(bytes) != (pos + self._z_content_length):
232
pos2 = data.index(b'\n', pos, pos + 14)
233
self._z_content_length = int(data[pos:pos2])
235
pos2 = data.index(b'\n', pos, pos + 14)
236
self._content_length = int(data[pos:pos2])
238
if len(data) != (pos + self._z_content_length):
207
239
# XXX: Define some GCCorrupt error ?
208
240
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
241
(len(data), pos, self._z_content_length))
242
self._z_content_chunks = (data[pos:],)
245
def _z_content(self):
246
"""Return z_content_chunks as a simple string.
248
Meant only to be used by the test suite.
250
if self._z_content_chunks is not None:
251
return b''.join(self._z_content_chunks)
213
255
def from_bytes(cls, bytes):
215
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
258
if header not in cls.GCB_KNOWN_HEADERS:
216
259
raise ValueError('bytes did not start with any of %r'
217
260
% (cls.GCB_KNOWN_HEADERS,))
218
# XXX: why not testing the whole header ?
261
if header == cls.GCB_HEADER:
220
262
out._compressor_name = 'zlib'
221
elif bytes[4] == 'l':
263
elif header == cls.GCB_LZ_HEADER:
222
264
out._compressor_name = 'lzma'
224
raise ValueError('unknown compressor: %r' % (bytes,))
266
raise ValueError('unknown compressor: %r' % (header,))
225
267
out._parse_bytes(bytes, 6)
233
275
:return: The bytes for the content
235
277
if start == end == 0:
237
279
self._ensure_content(end)
238
280
# The bytes are 'f' or 'd' for the type, then a variable-length
239
281
# base128 integer for the content size, then the actual content
240
282
# We know that the variable-length integer won't be longer than 5
241
283
# bytes (it takes 5 bytes to encode 2^32)
242
c = self._content[start]
284
c = self._content[start:start + 1]
244
286
type = 'fulltext'
247
289
raise ValueError('Unknown content control code: %s'
253
295
if end != content_start + content_len:
254
296
raise ValueError('end != len according to field header'
255
297
' %s != %s' % (end, content_start + content_len))
257
bytes = self._content[content_start:end]
259
bytes = apply_delta_to_source(self._content, content_start, end)
299
return self._content[content_start:end]
300
# Must be type delta as checked above
301
return apply_delta_to_source(self._content, content_start, end)
262
303
def set_chunked_content(self, content_chunks, length):
263
304
"""Set the content of this block to the given chunks."""
269
310
self._content_length = length
270
311
self._content_chunks = content_chunks
271
312
self._content = None
272
self._z_content = None
313
self._z_content_chunks = None
274
315
def set_content(self, content):
275
316
"""Set the content of this block."""
276
317
self._content_length = len(content)
277
318
self._content = content
278
self._z_content = None
280
def _create_z_content_using_lzma(self):
281
if self._content_chunks is not None:
282
self._content = ''.join(self._content_chunks)
283
self._content_chunks = None
284
if self._content is None:
285
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
289
def _create_z_content_from_chunks(self):
319
self._z_content_chunks = None
321
def _create_z_content_from_chunks(self, chunks):
290
322
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
323
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
324
# (measured peak is maybe 30MB over the above...)
325
compressed_chunks = list(map(compressor.compress, chunks))
292
326
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
327
# Ignore empty chunks
328
self._z_content_chunks = [c for c in compressed_chunks if c]
329
self._z_content_length = sum(map(len, self._z_content_chunks))
296
331
def _create_z_content(self):
297
if self._z_content is not None:
300
self._create_z_content_using_lzma()
332
if self._z_content_chunks is not None:
302
334
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
335
chunks = self._content_chunks
337
chunks = (self._content,)
338
self._create_z_content_from_chunks(chunks)
341
"""Create the byte stream as a series of 'chunks'"""
342
self._create_z_content()
343
header = self.GCB_HEADER
344
chunks = [b'%s%d\n%d\n'
345
% (header, self._z_content_length, self._content_length),
347
chunks.extend(self._z_content_chunks)
348
total_len = sum(map(len, chunks))
349
return total_len, chunks
308
351
def to_bytes(self):
309
352
"""Encode the information into a byte stream."""
310
self._create_z_content()
312
header = self.GCB_LZ_HEADER
314
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
319
return ''.join(chunks)
353
total_len, chunks = self.to_chunks()
354
return b''.join(chunks)
321
356
def _dump(self, include_text=False):
322
357
"""Take this block, and spit out a human-readable structure.
334
369
while pos < self._content_length:
335
kind = self._content[pos]
370
kind = self._content[pos:pos + 1]
337
if kind not in ('f', 'd'):
372
if kind not in (b'f', b'd'):
338
373
raise ValueError('invalid kind character: %r' % (kind,))
339
374
content_len, len_len = decode_base128_int(
340
375
self._content[pos:pos + 5])
342
377
if content_len + pos > self._content_length:
343
378
raise ValueError('invalid content_len %d for record @ pos %d'
344
379
% (content_len, pos - len_len - 1))
345
if kind == 'f': # Fulltext
380
if kind == b'f': # Fulltext
347
382
text = self._content[pos:pos+content_len]
348
result.append(('f', content_len, text))
383
result.append((b'f', content_len, text))
350
result.append(('f', content_len))
351
elif kind == 'd': # Delta
385
result.append((b'f', content_len))
386
elif kind == b'd': # Delta
352
387
delta_content = self._content[pos:pos+content_len]
354
389
# The first entry in a delta is the decompressed length
355
390
decomp_len, delta_pos = decode_base128_int(delta_content)
356
result.append(('d', content_len, decomp_len, delta_info))
391
result.append((b'd', content_len, decomp_len, delta_info))
358
393
while delta_pos < content_len:
359
c = ord(delta_content[delta_pos])
394
c = indexbytes(delta_content, delta_pos)
361
396
if c & 0x80: # Copy
366
401
text = self._content[offset:offset+length]
367
delta_info.append(('c', offset, length, text))
402
delta_info.append((b'c', offset, length, text))
369
delta_info.append(('c', offset, length))
404
delta_info.append((b'c', offset, length))
370
405
measured_len += length
373
408
txt = delta_content[delta_pos:delta_pos+c]
376
delta_info.append(('i', c, txt))
411
delta_info.append((b'i', c, txt))
377
412
measured_len += c
379
414
if delta_pos != content_len:
429
464
# wire bytes, something...
430
465
return self._manager._wire_bytes()
433
468
if storage_kind in ('fulltext', 'chunked'):
434
469
if self._bytes is None:
435
470
# Grab and cache the raw bytes for this entry
436
471
# and break the ref-cycle with _manager since we don't need it
438
self._manager._prepare_for_extract()
474
self._manager._prepare_for_extract()
475
except zlib.error as value:
476
raise DecompressCorruption("zlib: " + str(value))
439
477
block = self._manager._block
440
478
self._bytes = block.extract(self.key, self._start, self._end)
441
479
# There are code paths that first extract as fulltext, and then
460
498
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
499
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
501
def __init__(self, block, get_compressor_settings=None):
464
502
self._block = block
465
503
# We need to preserve the ordering
466
504
self._factories = []
467
505
self._last_byte = 0
506
self._get_settings = get_compressor_settings
507
self._compressor_settings = None
509
def _get_compressor_settings(self):
510
if self._compressor_settings is not None:
511
return self._compressor_settings
513
if self._get_settings is not None:
514
settings = self._get_settings()
516
vf = GroupCompressVersionedFiles
517
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
518
self._compressor_settings = settings
519
return self._compressor_settings
469
521
def add_factory(self, key, parents, start, end):
470
522
if not self._factories:
503
555
new_block.set_content(self._block._content[:last_byte])
504
556
self._block = new_block
558
def _make_group_compressor(self):
559
return GroupCompressor(self._get_compressor_settings())
506
561
def _rebuild_block(self):
507
562
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
563
compressor = self._make_group_compressor()
509
564
tstart = time.time()
510
565
old_length = self._block._content_length
523
578
# block? It seems hard to come up with a method that it would
524
579
# expand, since we do full compression again. Perhaps based on a
525
580
# request that ends up poorly ordered?
581
# TODO: If the content would have expanded, then we would want to
582
# handle a case where we need to split the block.
583
# Now that we have a user-tweakable option
584
# (max_bytes_to_index), it is possible that one person set it
585
# to a very low value, causing poor compression.
526
586
delta = time.time() - tstart
527
587
self._block = new_block
528
588
trace.mutter('creating new compressed block on-the-fly in %.3fs'
662
722
# 1 line for end byte
663
723
header_lines = []
664
724
for factory in self._factories:
665
key_bytes = '\x00'.join(factory.key)
725
key_bytes = b'\x00'.join(factory.key)
666
726
parents = factory.parents
667
727
if parents is None:
668
parent_bytes = 'None:'
728
parent_bytes = b'None:'
670
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
671
record_header = '%s\n%s\n%d\n%d\n' % (
730
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
731
record_header = b'%s\n%s\n%d\n%d\n' % (
672
732
key_bytes, parent_bytes, factory._start, factory._end)
673
733
header_lines.append(record_header)
674
734
# TODO: Can we break the refcycle at this point and set
675
735
# factory._manager = None?
676
header_bytes = ''.join(header_lines)
736
header_bytes = b''.join(header_lines)
678
738
header_bytes_len = len(header_bytes)
679
739
z_header_bytes = zlib.compress(header_bytes)
681
741
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
742
block_bytes_len, block_chunks = self._block.to_chunks()
743
lines.append(b'%d\n%d\n%d\n' % (
744
z_header_bytes_len, header_bytes_len, block_bytes_len))
685
745
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
688
return ''.join(lines)
746
lines.extend(block_chunks)
747
del z_header_bytes, block_chunks
748
# TODO: This is a point where we will double the memory consumption. To
749
# avoid this, we probably have to switch to a 'chunked' api
750
return b''.join(lines)
691
753
def from_bytes(cls, bytes):
692
754
# TODO: This does extra string copying, probably better to do it a
755
# different way. At a minimum this creates 2 copies of the
694
757
(storage_kind, z_header_len, header_len,
695
block_len, rest) = bytes.split('\n', 4)
758
block_len, rest) = bytes.split(b'\n', 4)
697
if storage_kind != 'groupcompress-block':
760
if storage_kind != b'groupcompress-block':
698
761
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
699
762
z_header_len = int(z_header_len)
700
763
if len(rest) < z_header_len:
723
786
block = GroupCompressBlock.from_bytes(block_bytes)
725
788
result = cls(block)
726
for start in xrange(0, len(header_lines), 4):
789
for start in range(0, len(header_lines), 4):
728
key = tuple(header_lines[start].split('\x00'))
791
key = tuple(header_lines[start].split(b'\x00'))
729
792
parents_line = header_lines[start+1]
730
if parents_line == 'None:':
793
if parents_line == b'None:':
733
parents = tuple([tuple(segment.split('\x00'))
734
for segment in parents_line.split('\t')
796
parents = tuple([tuple(segment.split(b'\x00'))
797
for segment in parents_line.split(b'\t')
736
799
start_offset = int(header_lines[start+2])
737
800
end_offset = int(header_lines[start+3])
792
859
if sha1 == nostore_sha:
793
860
raise errors.ExistingContent()
794
861
if key[-1] is None:
795
key = key[:-1] + ('sha1:' + sha1,)
862
key = key[:-1] + (b'sha1:' + sha1,)
797
864
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
798
865
return sha1, start, end, type
825
892
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
826
893
delta_chunks = self.chunks[start_chunk:end_chunk]
827
stored_bytes = ''.join(delta_chunks)
828
if stored_bytes[0] == 'f':
894
stored_bytes = b''.join(delta_chunks)
895
kind = stored_bytes[:1]
829
897
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
830
898
data_len = fulltext_len + 1 + offset
831
899
if data_len != len(stored_bytes):
832
900
raise ValueError('Index claimed fulltext len, but stored bytes'
833
901
' claim %s != %s'
834
902
% (len(stored_bytes), data_len))
835
bytes = stored_bytes[offset + 1:]
903
data = stored_bytes[offset + 1:]
906
raise ValueError('Unknown content kind, bytes claim %s' % kind)
837
907
# XXX: This is inefficient at best
838
source = ''.join(self.chunks[:start_chunk])
839
if stored_bytes[0] != 'd':
840
raise ValueError('Unknown content kind, bytes claim %s'
841
% (stored_bytes[0],))
908
source = b''.join(self.chunks[:start_chunk])
842
909
delta_len, offset = decode_base128_int(stored_bytes[1:10])
843
910
data_len = delta_len + 1 + offset
844
911
if data_len != len(stored_bytes):
845
912
raise ValueError('Index claimed delta len, but stored bytes'
846
913
' claim %s != %s'
847
914
% (len(stored_bytes), data_len))
848
bytes = apply_delta(source, stored_bytes[offset + 1:])
849
bytes_sha1 = osutils.sha_string(bytes)
850
return bytes, bytes_sha1
915
data = apply_delta(source, stored_bytes[offset + 1:])
916
data_sha1 = osutils.sha_string(data)
917
return data, data_sha1
853
920
"""Finish this group, creating a formatted stream.
855
922
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
924
self._block.set_chunked_content(self.chunks, self.endpoint)
866
925
self.chunks = None
867
926
self._delta_index = None
886
945
class PythonGroupCompressor(_CommonGroupCompressor):
947
def __init__(self, settings=None):
889
948
"""Create a GroupCompressor.
891
950
Used only if the pyrex version is not available.
893
super(PythonGroupCompressor, self).__init__()
952
super(PythonGroupCompressor, self).__init__(settings)
894
953
self._delta_index = LinesDeltaIndex([])
895
954
# The actual content is managed by LinesDeltaIndex
896
955
self.chunks = self._delta_index.lines
905
964
if delta_length > max_delta_size:
906
965
# The delta is longer than the fulltext, insert a fulltext
907
966
type = 'fulltext'
908
out_lines = ['f', encode_base128_int(input_len)]
967
out_lines = [b'f', encode_base128_int(input_len)]
909
968
out_lines.extend(new_lines)
910
969
index_lines = [False, False]
911
970
index_lines.extend([True] * len(new_lines))
913
972
# this is a worthy delta, output it
916
975
# Update the delta_length to include those two encoded integers
917
976
out_lines[1] = encode_base128_int(delta_length)
918
977
# Before insertion
934
993
It contains code very similar to SequenceMatcher because of having a similar
935
994
task. However some key differences apply:
936
- there is no junk, we want a minimal edit not a human readable diff.
937
- we don't filter very common lines (because we don't know where a good
938
range will start, and after the first text we want to be emitting minmal
940
- we chain the left side, not the right side
941
- we incrementally update the adjacency matrix as new lines are provided.
942
- we look for matches in all of the left side, so the routine which does
943
the analagous task of find_longest_match does not need to filter on the
996
* there is no junk, we want a minimal edit not a human readable diff.
997
* we don't filter very common lines (because we don't know where a good
998
range will start, and after the first text we want to be emitting minmal
1000
* we chain the left side, not the right side
1001
* we incrementally update the adjacency matrix as new lines are provided.
1002
* we look for matches in all of the left side, so the routine which does
1003
the analagous task of find_longest_match does not need to filter on the
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1007
def __init__(self, settings=None):
1008
super(PyrexGroupCompressor, self).__init__(settings)
1009
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1010
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1012
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1013
"""see _CommonGroupCompressor._compress"""
970
1031
enc_length = encode_base128_int(len(bytes))
971
1032
len_mini_header = 1 + len(enc_length)
972
1033
self._delta_index.add_source(bytes, len_mini_header)
973
new_chunks = ['f', enc_length, bytes]
1034
new_chunks = [b'f', enc_length, bytes]
976
1037
enc_length = encode_base128_int(len(delta))
977
1038
len_mini_header = 1 + len(enc_length)
978
new_chunks = ['d', enc_length, delta]
1039
new_chunks = [b'd', enc_length, delta]
979
1040
self._delta_index.add_delta_source(delta, len_mini_header)
980
1041
# Before insertion
981
1042
start = self.endpoint
1027
1088
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1089
add_callback=graph_index.add_nodes,
1029
1090
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1091
access = pack_repo._DirectPackAccess({})
1031
1092
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1093
result = GroupCompressVersionedFiles(index, access, delta)
1033
1094
result.stream = stream
1044
1105
class _BatchingBlockFetcher(object):
1045
1106
"""Fetch group compress blocks in batches.
1047
1108
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1109
currently pending batch.
1051
def __init__(self, gcvf, locations):
1112
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1113
self.gcvf = gcvf
1053
1114
self.locations = locations
1136
1198
memos_to_get_stack.pop()
1138
1200
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1201
self.manager = _LazyGroupContentManager(block,
1202
get_compressor_settings=self._get_compressor_settings)
1140
1203
self.last_read_memo = read_memo
1141
1204
start, end = index_memo[3:5]
1142
1205
self.manager.add_factory(key, parents, start, end)
1149
1212
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1215
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1216
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1218
# This controls how the GroupCompress DeltaIndex works. Basically, we
1219
# compute hash pointers into the source blocks (so hash(text) => text).
1220
# However each of these references costs some memory in trade against a
1221
# more accurate match result. For very large files, they either are
1222
# pre-compressed and change in bulk whenever they change, or change in just
1223
# local blocks. Either way, 'improved resolution' is not very helpful,
1224
# versus running out of memory trying to track everything. The default max
1225
# gives 100% sampling of a 1MB file.
1226
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1227
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1228
_DEFAULT_MAX_BYTES_TO_INDEX}
1230
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1232
"""Create a GroupCompressVersionedFiles object.
1158
1234
:param index: The index object storing access and graph data.
1159
1235
:param access: The access object storing raw data.
1160
1236
:param delta: Whether to delta compress or just entropy compress.
1161
1237
:param _unadded_refs: private parameter, don't use.
1238
:param _group_cache: private parameter, don't use.
1163
1240
self._index = index
1164
1241
self._access = access
1166
1243
if _unadded_refs is None:
1167
1244
_unadded_refs = {}
1168
1245
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1246
if _group_cache is None:
1247
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1248
self._group_cache = _group_cache
1249
self._immediate_fallback_vfs = []
1250
self._max_bytes_to_index = None
1172
1252
def without_fallbacks(self):
1173
1253
"""Return a clone of this object without any fallbacks configured."""
1174
1254
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1255
self._delta, _unadded_refs=dict(self._unadded_refs),
1256
_group_cache=self._group_cache)
1177
1258
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1259
left_matching_blocks=None, nostore_sha=None, random_id=False,
1182
1263
:param key: The key tuple of the text to add.
1183
1264
:param parents: The parents key tuples of the text to add.
1184
1265
:param lines: A list of lines. Each line must be a bytestring. And all
1185
of them except the last must be terminated with \n and contain no
1186
other \n's. The last line may either contain no \n's or a single
1187
terminating \n. If the lines list does meet this constraint the add
1188
routine may error or may succeed - but you will be unable to read
1189
the data back accurately. (Checking the lines have been split
1266
of them except the last must be terminated with \\n and contain no
1267
other \\n's. The last line may either contain no \\n's or a single
1268
terminating \\n. If the lines list does meet this constraint the
1269
add routine may error or may succeed - but you will be unable to
1270
read the data back accurately. (Checking the lines have been split
1190
1271
correctly is expensive and extremely unlikely to catch bugs so it
1191
1272
is not done at runtime unless check_content is True.)
1192
1273
:param parent_texts: An optional dictionary containing the opaque
1228
1309
"""See VersionedFiles._add_text()."""
1229
1310
self._index._check_write_ok()
1230
1311
self._check_add(key, None, random_id, check_content=False)
1231
if text.__class__ is not str:
1312
if not isinstance(text, bytes):
1232
1313
raise errors.BzrBadParameterUnicode("text")
1233
1314
if parents is None:
1234
1315
# The caller might pass None if there is no graph data, but kndx
1287
1368
self._check_lines_not_unicode(lines)
1288
1369
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1306
1371
def get_parent_map(self, keys):
1307
1372
"""Get a map of the graph parents of keys.
1447
1512
The returned objects should be in the order defined by 'ordering',
1448
1513
which can weave between different sources.
1449
1515
:param ordering: Must be one of 'topological' or 'groupcompress'
1450
1516
:return: List of [(source, [keys])] tuples, such that all keys are in
1451
1517
the defined order, regardless of source.
1453
1519
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1520
present_keys = tsort.topo_sort(parent_map)
1456
1522
# ordering == 'groupcompress'
1457
1523
# XXX: This only optimizes for the target ordering. We may need
1493
1559
# This is the group the bytes are stored in, followed by the
1494
1560
# location in the group
1495
1561
return locations[key][0]
1496
present_keys = sorted(locations.iterkeys(), key=get_group)
1497
1562
# We don't have an ordering for keys in the in-memory object, but
1498
1563
# lets process the in-memory ones first.
1499
present_keys = list(unadded_keys) + present_keys
1564
present_keys = list(unadded_keys)
1565
present_keys.extend(sorted(locations, key=get_group))
1500
1566
# Now grab all of the ones from other sources
1501
1567
source_keys = [(self, present_keys)]
1502
1568
source_keys.extend(source_result)
1546
1612
# - we encounter an unadded ref, or
1547
1613
# - we run out of keys, or
1548
1614
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1615
batcher = _BatchingBlockFetcher(self, locations,
1616
get_compressor_settings=self._get_compressor_settings)
1550
1617
for source, keys in source_keys:
1551
1618
if source is self:
1552
1619
for key in keys:
1598
1665
for _ in self._insert_record_stream(stream, random_id=False):
1668
def _get_compressor_settings(self):
1669
if self._max_bytes_to_index is None:
1670
# TODO: VersionedFiles don't know about their containing
1671
# repository, so they don't have much of an idea about their
1672
# location. So for now, this is only a global option.
1673
c = config.GlobalConfig()
1674
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1678
except ValueError as e:
1679
trace.warning('Value for '
1680
'"bzr.groupcompress.max_bytes_to_index"'
1681
' %r is not an integer'
1685
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1686
self._max_bytes_to_index = val
1687
return {'max_bytes_to_index': self._max_bytes_to_index}
1689
def _make_group_compressor(self):
1690
return GroupCompressor(self._get_compressor_settings())
1601
1692
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1693
reuse_blocks=True):
1603
1694
"""Internal core to insert a record stream into this container.
1627
1718
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1720
self._compressor = self._make_group_compressor()
1630
1721
self._unadded_refs = {}
1631
1722
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1724
bytes_len, chunks = self._compressor.flush().to_chunks()
1725
self._compressor = self._make_group_compressor()
1726
# Note: At this point we still have 1 copy of the fulltext (in
1727
# record and the var 'bytes'), and this generates 2 copies of
1728
# the compressed text (one for bytes, one in chunks)
1729
# TODO: Push 'chunks' down into the _access api, so that we don't
1730
# have to double compressed memory here
1731
# TODO: Figure out how to indicate that we would be happy to free
1732
# the fulltext content at this point. Note that sometimes we
1733
# will want it later (streaming CHK pages), but most of the
1734
# time we won't (everything else)
1735
data = b''.join(chunks)
1635
1737
index, start, length = self._access.add_raw_records(
1636
[(None, len(bytes))], bytes)[0]
1738
[(None, len(data))], data)[0]
1638
1740
for key, reads, refs in keys_to_add:
1639
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1741
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1640
1742
self._index.add_records(nodes, random_id=random_id)
1641
1743
self._unadded_refs = {}
1642
1744
del keys_to_add[:]
1656
1758
raise errors.RevisionNotPresent(record.key, self)
1658
1760
if record.key in inserted_keys:
1659
trace.note('Insert claimed random_id=True,'
1660
' but then inserted %r two times', record.key)
1761
trace.note(gettext('Insert claimed random_id=True,'
1762
' but then inserted %r two times'), record.key)
1662
1764
inserted_keys.add(record.key)
1663
1765
if reuse_blocks:
1692
1794
' the current record, we cannot be positive'
1693
1795
' that the appropriate content was inserted.'
1695
value = "%d %d %d %d" % (block_start, block_length,
1797
value = b"%d %d %d %d" % (block_start, block_length,
1696
1798
record._start, record._end)
1697
1799
nodes = [(record.key, value, (record.parents,))]
1698
1800
# TODO: Consider buffering up many nodes to be added, not
1802
1905
"""See VersionedFiles.keys."""
1803
1906
if 'evil' in debug.debug_flags:
1804
1907
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1908
sources = [self._index] + self._immediate_fallback_vfs
1807
1910
for source in sources:
1808
1911
result.update(source.keys())
1915
class _GCBuildDetails(object):
1916
"""A blob of data about the build details.
1918
This stores the minimal data, which then allows compatibility with the old
1919
api, without taking as much memory.
1922
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1923
'_delta_end', '_parents')
1926
compression_parent = None
1928
def __init__(self, parents, position_info):
1929
self._parents = parents
1930
(self._index, self._group_start, self._group_end, self._basis_end,
1931
self._delta_end) = position_info
1934
return '%s(%s, %s)' % (self.__class__.__name__,
1935
self.index_memo, self._parents)
1938
def index_memo(self):
1939
return (self._index, self._group_start, self._group_end,
1940
self._basis_end, self._delta_end)
1943
def record_details(self):
1944
return static_tuple.StaticTuple(self.method, None)
1946
def __getitem__(self, offset):
1947
"""Compatibility thunk to act like a tuple."""
1949
return self.index_memo
1951
return self.compression_parent # Always None
1953
return self._parents
1955
return self.record_details
1957
raise IndexError('offset out of range')
1812
1963
class _GCGraphIndex(object):
1813
1964
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1817
1968
inconsistency_fatal=True, track_new_keys=False):
1818
1969
"""Construct a _GCGraphIndex on a graph_index.
1820
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1971
:param graph_index: An implementation of breezy.index.GraphIndex.
1821
1972
:param is_locked: A callback, returns True if the index is locked and
1823
1974
:param parents: If True, record knits parents, if not do not record
1902
2053
if self._parents:
1903
for key, (value, node_refs) in keys.iteritems():
2054
for key, (value, node_refs) in viewitems(keys):
1904
2055
result.append((key, value, node_refs))
1906
for key, (value, node_refs) in keys.iteritems():
2057
for key, (value, node_refs) in viewitems(keys):
1907
2058
result.append((key, value))
1908
2059
records = result
1909
2060
key_dependencies = self._key_dependencies
1989
2140
:param keys: An iterable of keys.
1990
2141
:return: A dict of key:
1991
2142
(index_memo, compression_parent, parents, record_details).
1993
opaque structure to pass to read_records to extract the raw
1996
Content that this record is built upon, may be None
1998
Logical parents of this node
2000
extra information about the content which needs to be passed to
2001
Factory.parse_record
2144
* index_memo: opaque structure to pass to read_records to extract
2146
* compression_parent: Content that this record is built upon, may
2148
* parents: Logical parents of this node
2149
* record_details: extra information about the content which needs
2150
to be passed to Factory.parse_record
2003
2152
self._check_read()
2025
2173
def _node_to_position(self, node):
2026
2174
"""Convert an index value to position details."""
2027
bits = node[2].split(' ')
2175
bits = node[2].split(b' ')
2028
2176
# It would be nice not to read the entire gzip.
2029
2177
# start and stop are put into _int_cache because they are very common.
2030
2178
# They define the 'group' that an entry is in, and many groups can have
2033
2181
# each, or about 7MB. Note that it might be even more when you consider
2034
2182
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2183
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2184
# a LIFO when re-using PyInt slots, which might cause more
2037
2185
# fragmentation.
2038
2186
start = int(bits[0])
2039
2187
start = self._int_cache.setdefault(start, start)