17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
24
from ..lazy_import import lazy_import
25
lazy_import(globals(), """
30
30
graph as _mod_graph,
37
from bzrlib.btree_index import BTreeBuilder
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
from bzrlib.versionedfile import (
36
from breezy.bzr import (
42
from breezy.i18n import gettext
48
from .btree_index import BTreeBuilder
49
from ..lru_cache import LRUSizeCache
50
from ..sixish import (
56
from .versionedfile import (
42
59
AbsentContentFactory,
43
60
ChunkedContentFactory,
44
61
FulltextContentFactory,
62
VersionedFilesWithFallbacks,
48
65
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
66
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
54
# osutils.sha_string('')
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
69
# osutils.sha_string(b'')
70
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
57
72
def sort_gc_optimal(parent_map):
58
73
"""Sort and group the keys in parent_map into groupcompress order.
65
80
# groupcompress ordering is approximately reverse topological,
66
81
# properly grouped by file-id.
67
82
per_prefix_map = {}
68
for key, value in parent_map.iteritems():
69
if isinstance(key, str) or len(key) == 1:
83
for key, value in viewitems(parent_map):
84
if isinstance(key, bytes) or len(key) == 1:
79
94
for prefix in sorted(per_prefix_map):
80
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
95
present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
96
return present_keys
99
class DecompressCorruption(errors.BzrError):
101
_fmt = "Corruption while decompressing repository file%(orig_error)s"
103
def __init__(self, orig_error=None):
104
if orig_error is not None:
105
self.orig_error = ", %s" % (orig_error,)
108
errors.BzrError.__init__(self)
84
111
# The max zlib window size is 32kB, so if we set 'max_size' output of the
85
112
# decompressor to the requested bytes + 32kB, then we should guarantee
86
113
# num_bytes coming out.
95
122
# Group Compress Block v1 Zlib
96
GCB_HEADER = 'gcb1z\n'
123
GCB_HEADER = b'gcb1z\n'
97
124
# Group Compress Block v1 Lzma
98
GCB_LZ_HEADER = 'gcb1l\n'
125
GCB_LZ_HEADER = b'gcb1l\n'
99
126
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
101
128
def __init__(self):
102
129
# map by key? or just order in file?
103
130
self._compressor_name = None
104
self._z_content = None
131
self._z_content_chunks = None
105
132
self._z_content_decompressor = None
106
133
self._z_content_length = None
107
134
self._content_length = None
132
159
# Expand the content if required
133
160
if self._content is None:
134
161
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
162
self._content = b''.join(self._content_chunks)
136
163
self._content_chunks = None
137
164
if self._content is None:
138
if self._z_content is None:
165
# We join self._z_content_chunks here, because if we are
166
# decompressing, then it is *very* likely that we have a single
168
if self._z_content_chunks is None:
139
169
raise AssertionError('No content to decompress')
140
if self._z_content == '':
170
z_content = b''.join(self._z_content_chunks)
142
173
elif self._compressor_name == 'lzma':
143
174
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
176
self._content = pylzma.decompress(z_content)
145
177
elif self._compressor_name == 'zlib':
146
178
# Start a zlib decompressor
147
179
if num_bytes * 4 > self._content_length * 3:
148
180
# If we are requesting more that 3/4ths of the content,
149
181
# just extract the whole thing in a single pass
150
182
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
183
self._content = zlib.decompress(z_content)
153
185
self._z_content_decompressor = zlib.decompressobj()
154
186
# Seed the decompressor with the uncompressed bytes, so
155
187
# that the rest of the code is simplified
156
188
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
189
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
190
if not self._z_content_decompressor.unconsumed_tail:
159
191
self._z_content_decompressor = None
197
229
# At present, we have 2 integers for the compressed and uncompressed
198
230
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
199
231
# checking too far, cap the search to 14 bytes.
200
pos2 = bytes.index('\n', pos, pos + 14)
201
self._z_content_length = int(bytes[pos:pos2])
203
pos2 = bytes.index('\n', pos, pos + 14)
204
self._content_length = int(bytes[pos:pos2])
206
if len(bytes) != (pos + self._z_content_length):
232
pos2 = data.index(b'\n', pos, pos + 14)
233
self._z_content_length = int(data[pos:pos2])
235
pos2 = data.index(b'\n', pos, pos + 14)
236
self._content_length = int(data[pos:pos2])
238
if len(data) != (pos + self._z_content_length):
207
239
# XXX: Define some GCCorrupt error ?
208
240
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
241
(len(data), pos, self._z_content_length))
242
self._z_content_chunks = (data[pos:],)
245
def _z_content(self):
246
"""Return z_content_chunks as a simple string.
248
Meant only to be used by the test suite.
250
if self._z_content_chunks is not None:
251
return b''.join(self._z_content_chunks)
213
255
def from_bytes(cls, bytes):
215
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
258
if header not in cls.GCB_KNOWN_HEADERS:
216
259
raise ValueError('bytes did not start with any of %r'
217
260
% (cls.GCB_KNOWN_HEADERS,))
218
# XXX: why not testing the whole header ?
261
if header == cls.GCB_HEADER:
220
262
out._compressor_name = 'zlib'
221
elif bytes[4] == 'l':
263
elif header == cls.GCB_LZ_HEADER:
222
264
out._compressor_name = 'lzma'
224
raise ValueError('unknown compressor: %r' % (bytes,))
266
raise ValueError('unknown compressor: %r' % (header,))
225
267
out._parse_bytes(bytes, 6)
233
275
:return: The bytes for the content
235
277
if start == end == 0:
237
279
self._ensure_content(end)
238
280
# The bytes are 'f' or 'd' for the type, then a variable-length
239
281
# base128 integer for the content size, then the actual content
240
282
# We know that the variable-length integer won't be longer than 5
241
283
# bytes (it takes 5 bytes to encode 2^32)
242
c = self._content[start]
284
c = self._content[start:start + 1]
244
286
type = 'fulltext'
247
289
raise ValueError('Unknown content control code: %s'
253
295
if end != content_start + content_len:
254
296
raise ValueError('end != len according to field header'
255
297
' %s != %s' % (end, content_start + content_len))
257
bytes = self._content[content_start:end]
259
bytes = apply_delta_to_source(self._content, content_start, end)
299
return self._content[content_start:end]
300
# Must be type delta as checked above
301
return apply_delta_to_source(self._content, content_start, end)
262
303
def set_chunked_content(self, content_chunks, length):
263
304
"""Set the content of this block to the given chunks."""
269
310
self._content_length = length
270
311
self._content_chunks = content_chunks
271
312
self._content = None
272
self._z_content = None
313
self._z_content_chunks = None
274
315
def set_content(self, content):
275
316
"""Set the content of this block."""
276
317
self._content_length = len(content)
277
318
self._content = content
278
self._z_content = None
280
def _create_z_content_using_lzma(self):
281
if self._content_chunks is not None:
282
self._content = ''.join(self._content_chunks)
283
self._content_chunks = None
284
if self._content is None:
285
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
289
def _create_z_content_from_chunks(self):
319
self._z_content_chunks = None
321
def _create_z_content_from_chunks(self, chunks):
290
322
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
323
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
324
# (measured peak is maybe 30MB over the above...)
325
compressed_chunks = list(map(compressor.compress, chunks))
292
326
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
327
# Ignore empty chunks
328
self._z_content_chunks = [c for c in compressed_chunks if c]
329
self._z_content_length = sum(map(len, self._z_content_chunks))
296
331
def _create_z_content(self):
297
if self._z_content is not None:
300
self._create_z_content_using_lzma()
332
if self._z_content_chunks is not None:
302
334
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
335
chunks = self._content_chunks
337
chunks = (self._content,)
338
self._create_z_content_from_chunks(chunks)
341
"""Create the byte stream as a series of 'chunks'"""
342
self._create_z_content()
343
header = self.GCB_HEADER
344
chunks = [b'%s%d\n%d\n'
345
% (header, self._z_content_length, self._content_length),
347
chunks.extend(self._z_content_chunks)
348
total_len = sum(map(len, chunks))
349
return total_len, chunks
308
351
def to_bytes(self):
309
352
"""Encode the information into a byte stream."""
310
self._create_z_content()
312
header = self.GCB_LZ_HEADER
314
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
319
return ''.join(chunks)
353
total_len, chunks = self.to_chunks()
354
return b''.join(chunks)
321
356
def _dump(self, include_text=False):
322
357
"""Take this block, and spit out a human-readable structure.
334
369
while pos < self._content_length:
335
kind = self._content[pos]
370
kind = self._content[pos:pos + 1]
337
if kind not in ('f', 'd'):
372
if kind not in (b'f', b'd'):
338
373
raise ValueError('invalid kind character: %r' % (kind,))
339
374
content_len, len_len = decode_base128_int(
340
375
self._content[pos:pos + 5])
342
377
if content_len + pos > self._content_length:
343
378
raise ValueError('invalid content_len %d for record @ pos %d'
344
379
% (content_len, pos - len_len - 1))
345
if kind == 'f': # Fulltext
380
if kind == b'f': # Fulltext
347
382
text = self._content[pos:pos+content_len]
348
result.append(('f', content_len, text))
383
result.append((b'f', content_len, text))
350
result.append(('f', content_len))
351
elif kind == 'd': # Delta
385
result.append((b'f', content_len))
386
elif kind == b'd': # Delta
352
387
delta_content = self._content[pos:pos+content_len]
354
389
# The first entry in a delta is the decompressed length
355
390
decomp_len, delta_pos = decode_base128_int(delta_content)
356
result.append(('d', content_len, decomp_len, delta_info))
391
result.append((b'd', content_len, decomp_len, delta_info))
358
393
while delta_pos < content_len:
359
c = ord(delta_content[delta_pos])
394
c = indexbytes(delta_content, delta_pos)
361
396
if c & 0x80: # Copy
366
401
text = self._content[offset:offset+length]
367
delta_info.append(('c', offset, length, text))
402
delta_info.append((b'c', offset, length, text))
369
delta_info.append(('c', offset, length))
404
delta_info.append((b'c', offset, length))
370
405
measured_len += length
373
408
txt = delta_content[delta_pos:delta_pos+c]
376
delta_info.append(('i', c, txt))
411
delta_info.append((b'i', c, txt))
377
412
measured_len += c
379
414
if delta_pos != content_len:
429
464
# wire bytes, something...
430
465
return self._manager._wire_bytes()
433
468
if storage_kind in ('fulltext', 'chunked'):
434
469
if self._bytes is None:
435
470
# Grab and cache the raw bytes for this entry
436
471
# and break the ref-cycle with _manager since we don't need it
438
self._manager._prepare_for_extract()
474
self._manager._prepare_for_extract()
475
except zlib.error as value:
476
raise DecompressCorruption("zlib: " + str(value))
439
477
block = self._manager._block
440
478
self._bytes = block.extract(self.key, self._start, self._end)
441
479
# There are code paths that first extract as fulltext, and then
460
498
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
499
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
501
def __init__(self, block, get_compressor_settings=None):
464
502
self._block = block
465
503
# We need to preserve the ordering
466
504
self._factories = []
467
505
self._last_byte = 0
506
self._get_settings = get_compressor_settings
507
self._compressor_settings = None
509
def _get_compressor_settings(self):
510
if self._compressor_settings is not None:
511
return self._compressor_settings
513
if self._get_settings is not None:
514
settings = self._get_settings()
516
vf = GroupCompressVersionedFiles
517
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
518
self._compressor_settings = settings
519
return self._compressor_settings
469
521
def add_factory(self, key, parents, start, end):
470
522
if not self._factories:
503
555
new_block.set_content(self._block._content[:last_byte])
504
556
self._block = new_block
558
def _make_group_compressor(self):
559
return GroupCompressor(self._get_compressor_settings())
506
561
def _rebuild_block(self):
507
562
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
563
compressor = self._make_group_compressor()
509
564
tstart = time.time()
510
565
old_length = self._block._content_length
523
578
# block? It seems hard to come up with a method that it would
524
579
# expand, since we do full compression again. Perhaps based on a
525
580
# request that ends up poorly ordered?
581
# TODO: If the content would have expanded, then we would want to
582
# handle a case where we need to split the block.
583
# Now that we have a user-tweakable option
584
# (max_bytes_to_index), it is possible that one person set it
585
# to a very low value, causing poor compression.
526
586
delta = time.time() - tstart
527
587
self._block = new_block
528
588
trace.mutter('creating new compressed block on-the-fly in %.3fs'
662
722
# 1 line for end byte
663
723
header_lines = []
664
724
for factory in self._factories:
665
key_bytes = '\x00'.join(factory.key)
725
key_bytes = b'\x00'.join(factory.key)
666
726
parents = factory.parents
667
727
if parents is None:
668
parent_bytes = 'None:'
728
parent_bytes = b'None:'
670
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
671
record_header = '%s\n%s\n%d\n%d\n' % (
730
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
731
record_header = b'%s\n%s\n%d\n%d\n' % (
672
732
key_bytes, parent_bytes, factory._start, factory._end)
673
733
header_lines.append(record_header)
674
734
# TODO: Can we break the refcycle at this point and set
675
735
# factory._manager = None?
676
header_bytes = ''.join(header_lines)
736
header_bytes = b''.join(header_lines)
678
738
header_bytes_len = len(header_bytes)
679
739
z_header_bytes = zlib.compress(header_bytes)
681
741
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
742
block_bytes_len, block_chunks = self._block.to_chunks()
743
lines.append(b'%d\n%d\n%d\n' % (
744
z_header_bytes_len, header_bytes_len, block_bytes_len))
685
745
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
688
return ''.join(lines)
746
lines.extend(block_chunks)
747
del z_header_bytes, block_chunks
748
# TODO: This is a point where we will double the memory consumption. To
749
# avoid this, we probably have to switch to a 'chunked' api
750
return b''.join(lines)
691
753
def from_bytes(cls, bytes):
692
754
# TODO: This does extra string copying, probably better to do it a
755
# different way. At a minimum this creates 2 copies of the
694
757
(storage_kind, z_header_len, header_len,
695
block_len, rest) = bytes.split('\n', 4)
758
block_len, rest) = bytes.split(b'\n', 4)
697
if storage_kind != 'groupcompress-block':
760
if storage_kind != b'groupcompress-block':
698
761
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
699
762
z_header_len = int(z_header_len)
700
763
if len(rest) < z_header_len:
723
786
block = GroupCompressBlock.from_bytes(block_bytes)
725
788
result = cls(block)
726
for start in xrange(0, len(header_lines), 4):
789
for start in range(0, len(header_lines), 4):
728
key = tuple(header_lines[start].split('\x00'))
791
key = tuple(header_lines[start].split(b'\x00'))
729
792
parents_line = header_lines[start+1]
730
if parents_line == 'None:':
793
if parents_line == b'None:':
733
parents = tuple([tuple(segment.split('\x00'))
734
for segment in parents_line.split('\t')
796
parents = tuple([tuple(segment.split(b'\x00'))
797
for segment in parents_line.split(b'\t')
736
799
start_offset = int(header_lines[start+2])
737
800
end_offset = int(header_lines[start+3])
757
820
self.labels_deltas = {}
758
821
self._delta_index = None # Set by the children
759
822
self._block = GroupCompressBlock()
826
self._settings = settings
761
828
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
762
829
"""Compress lines with label key.
764
831
:param key: A key tuple. It is stored in the output
765
832
for identification of the text during decompression. If the last
766
element is 'None' it is replaced with the sha1 of the text -
833
element is b'None' it is replaced with the sha1 of the text -
767
834
e.g. sha1:xxxxxxx.
768
835
:param bytes: The bytes to be compressed
769
836
:param expected_sha: If non-None, the sha the lines are believed to
792
859
if sha1 == nostore_sha:
793
860
raise errors.ExistingContent()
794
861
if key[-1] is None:
795
key = key[:-1] + ('sha1:' + sha1,)
862
key = key[:-1] + (b'sha1:' + sha1,)
797
864
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
798
865
return sha1, start, end, type
825
892
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
826
893
delta_chunks = self.chunks[start_chunk:end_chunk]
827
stored_bytes = ''.join(delta_chunks)
828
if stored_bytes[0] == 'f':
894
stored_bytes = b''.join(delta_chunks)
895
kind = stored_bytes[:1]
829
897
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
830
898
data_len = fulltext_len + 1 + offset
831
899
if data_len != len(stored_bytes):
832
900
raise ValueError('Index claimed fulltext len, but stored bytes'
833
901
' claim %s != %s'
834
902
% (len(stored_bytes), data_len))
835
bytes = stored_bytes[offset + 1:]
903
data = stored_bytes[offset + 1:]
906
raise ValueError('Unknown content kind, bytes claim %s' % kind)
837
907
# XXX: This is inefficient at best
838
source = ''.join(self.chunks[:start_chunk])
839
if stored_bytes[0] != 'd':
840
raise ValueError('Unknown content kind, bytes claim %s'
841
% (stored_bytes[0],))
908
source = b''.join(self.chunks[:start_chunk])
842
909
delta_len, offset = decode_base128_int(stored_bytes[1:10])
843
910
data_len = delta_len + 1 + offset
844
911
if data_len != len(stored_bytes):
845
912
raise ValueError('Index claimed delta len, but stored bytes'
846
913
' claim %s != %s'
847
914
% (len(stored_bytes), data_len))
848
bytes = apply_delta(source, stored_bytes[offset + 1:])
849
bytes_sha1 = osutils.sha_string(bytes)
850
return bytes, bytes_sha1
915
data = apply_delta(source, stored_bytes[offset + 1:])
916
data_sha1 = osutils.sha_string(data)
917
return data, data_sha1
853
920
"""Finish this group, creating a formatted stream.
855
922
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
924
self._block.set_chunked_content(self.chunks, self.endpoint)
866
925
self.chunks = None
867
926
self._delta_index = None
886
945
class PythonGroupCompressor(_CommonGroupCompressor):
947
def __init__(self, settings=None):
889
948
"""Create a GroupCompressor.
891
950
Used only if the pyrex version is not available.
893
super(PythonGroupCompressor, self).__init__()
952
super(PythonGroupCompressor, self).__init__(settings)
894
953
self._delta_index = LinesDeltaIndex([])
895
954
# The actual content is managed by LinesDeltaIndex
896
955
self.chunks = self._delta_index.lines
905
964
if delta_length > max_delta_size:
906
965
# The delta is longer than the fulltext, insert a fulltext
907
966
type = 'fulltext'
908
out_lines = ['f', encode_base128_int(input_len)]
967
out_lines = [b'f', encode_base128_int(input_len)]
909
968
out_lines.extend(new_lines)
910
969
index_lines = [False, False]
911
970
index_lines.extend([True] * len(new_lines))
913
972
# this is a worthy delta, output it
916
975
# Update the delta_length to include those two encoded integers
917
976
out_lines[1] = encode_base128_int(delta_length)
918
977
# Before insertion
934
993
It contains code very similar to SequenceMatcher because of having a similar
935
994
task. However some key differences apply:
936
- there is no junk, we want a minimal edit not a human readable diff.
937
- we don't filter very common lines (because we don't know where a good
938
range will start, and after the first text we want to be emitting minmal
940
- we chain the left side, not the right side
941
- we incrementally update the adjacency matrix as new lines are provided.
942
- we look for matches in all of the left side, so the routine which does
943
the analagous task of find_longest_match does not need to filter on the
996
* there is no junk, we want a minimal edit not a human readable diff.
997
* we don't filter very common lines (because we don't know where a good
998
range will start, and after the first text we want to be emitting minmal
1000
* we chain the left side, not the right side
1001
* we incrementally update the adjacency matrix as new lines are provided.
1002
* we look for matches in all of the left side, so the routine which does
1003
the analagous task of find_longest_match does not need to filter on the
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1007
def __init__(self, settings=None):
1008
super(PyrexGroupCompressor, self).__init__(settings)
1009
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1010
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1012
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1013
"""see _CommonGroupCompressor._compress"""
970
1031
enc_length = encode_base128_int(len(bytes))
971
1032
len_mini_header = 1 + len(enc_length)
972
1033
self._delta_index.add_source(bytes, len_mini_header)
973
new_chunks = ['f', enc_length, bytes]
1034
new_chunks = [b'f', enc_length, bytes]
976
1037
enc_length = encode_base128_int(len(delta))
977
1038
len_mini_header = 1 + len(enc_length)
978
new_chunks = ['d', enc_length, delta]
1039
new_chunks = [b'd', enc_length, delta]
979
1040
self._delta_index.add_delta_source(delta, len_mini_header)
980
1041
# Before insertion
981
1042
start = self.endpoint
1027
1088
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1089
add_callback=graph_index.add_nodes,
1029
1090
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1091
access = pack_repo._DirectPackAccess({})
1031
1092
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1093
result = GroupCompressVersionedFiles(index, access, delta)
1033
1094
result.stream = stream
1044
1105
class _BatchingBlockFetcher(object):
1045
1106
"""Fetch group compress blocks in batches.
1047
1108
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1109
currently pending batch.
1051
def __init__(self, gcvf, locations):
1112
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1113
self.gcvf = gcvf
1053
1114
self.locations = locations
1136
1198
memos_to_get_stack.pop()
1138
1200
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1201
self.manager = _LazyGroupContentManager(block,
1202
get_compressor_settings=self._get_compressor_settings)
1140
1203
self.last_read_memo = read_memo
1141
1204
start, end = index_memo[3:5]
1142
1205
self.manager.add_factory(key, parents, start, end)
1149
1212
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1215
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1216
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1218
# This controls how the GroupCompress DeltaIndex works. Basically, we
1219
# compute hash pointers into the source blocks (so hash(text) => text).
1220
# However each of these references costs some memory in trade against a
1221
# more accurate match result. For very large files, they either are
1222
# pre-compressed and change in bulk whenever they change, or change in just
1223
# local blocks. Either way, 'improved resolution' is not very helpful,
1224
# versus running out of memory trying to track everything. The default max
1225
# gives 100% sampling of a 1MB file.
1226
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1227
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1228
_DEFAULT_MAX_BYTES_TO_INDEX}
1230
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1232
"""Create a GroupCompressVersionedFiles object.
1158
1234
:param index: The index object storing access and graph data.
1159
1235
:param access: The access object storing raw data.
1160
1236
:param delta: Whether to delta compress or just entropy compress.
1161
1237
:param _unadded_refs: private parameter, don't use.
1238
:param _group_cache: private parameter, don't use.
1163
1240
self._index = index
1164
1241
self._access = access
1166
1243
if _unadded_refs is None:
1167
1244
_unadded_refs = {}
1168
1245
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1246
if _group_cache is None:
1247
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1248
self._group_cache = _group_cache
1249
self._immediate_fallback_vfs = []
1250
self._max_bytes_to_index = None
1172
1252
def without_fallbacks(self):
1173
1253
"""Return a clone of this object without any fallbacks configured."""
1174
1254
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1255
self._delta, _unadded_refs=dict(self._unadded_refs),
1256
_group_cache=self._group_cache)
1177
1258
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1259
left_matching_blocks=None, nostore_sha=None, random_id=False,
1182
1263
:param key: The key tuple of the text to add.
1183
1264
:param parents: The parents key tuples of the text to add.
1184
1265
:param lines: A list of lines. Each line must be a bytestring. And all
1185
of them except the last must be terminated with \n and contain no
1186
other \n's. The last line may either contain no \n's or a single
1187
terminating \n. If the lines list does meet this constraint the add
1188
routine may error or may succeed - but you will be unable to read
1189
the data back accurately. (Checking the lines have been split
1266
of them except the last must be terminated with \\n and contain no
1267
other \\n's. The last line may either contain no \\n's or a single
1268
terminating \\n. If the lines list does meet this constraint the
1269
add routine may error or may succeed - but you will be unable to
1270
read the data back accurately. (Checking the lines have been split
1190
1271
correctly is expensive and extremely unlikely to catch bugs so it
1191
1272
is not done at runtime unless check_content is True.)
1192
1273
:param parent_texts: An optional dictionary containing the opaque
1224
1305
nostore_sha=nostore_sha))[0]
1225
1306
return sha1, length, None
1227
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1228
"""See VersionedFiles._add_text()."""
1229
self._index._check_write_ok()
1230
self._check_add(key, None, random_id, check_content=False)
1231
if text.__class__ is not str:
1232
raise errors.BzrBadParameterUnicode("text")
1234
# The caller might pass None if there is no graph data, but kndx
1235
# indexes can't directly store that, so we give them
1236
# an empty tuple instead.
1238
# double handling for now. Make it work until then.
1240
record = FulltextContentFactory(key, parents, None, text)
1241
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1242
nostore_sha=nostore_sha))[0]
1243
return sha1, length, None
1245
1308
def add_fallback_versioned_files(self, a_versioned_files):
1246
1309
"""Add a source of texts for texts not present in this knit.
1248
1311
:param a_versioned_files: A VersionedFiles object.
1250
self._fallback_vfs.append(a_versioned_files)
1313
self._immediate_fallback_vfs.append(a_versioned_files)
1252
1315
def annotate(self, key):
1253
1316
"""See VersionedFiles.annotate."""
1287
1350
self._check_lines_not_unicode(lines)
1288
1351
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1306
1353
def get_parent_map(self, keys):
1307
1354
"""Get a map of the graph parents of keys.
1447
1494
The returned objects should be in the order defined by 'ordering',
1448
1495
which can weave between different sources.
1449
1497
:param ordering: Must be one of 'topological' or 'groupcompress'
1450
1498
:return: List of [(source, [keys])] tuples, such that all keys are in
1451
1499
the defined order, regardless of source.
1453
1501
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1502
present_keys = tsort.topo_sort(parent_map)
1456
1504
# ordering == 'groupcompress'
1457
1505
# XXX: This only optimizes for the target ordering. We may need
1493
1541
# This is the group the bytes are stored in, followed by the
1494
1542
# location in the group
1495
1543
return locations[key][0]
1496
present_keys = sorted(locations.iterkeys(), key=get_group)
1497
1544
# We don't have an ordering for keys in the in-memory object, but
1498
1545
# lets process the in-memory ones first.
1499
present_keys = list(unadded_keys) + present_keys
1546
present_keys = list(unadded_keys)
1547
present_keys.extend(sorted(locations, key=get_group))
1500
1548
# Now grab all of the ones from other sources
1501
1549
source_keys = [(self, present_keys)]
1502
1550
source_keys.extend(source_result)
1546
1594
# - we encounter an unadded ref, or
1547
1595
# - we run out of keys, or
1548
1596
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1597
batcher = _BatchingBlockFetcher(self, locations,
1598
get_compressor_settings=self._get_compressor_settings)
1550
1599
for source, keys in source_keys:
1551
1600
if source is self:
1552
1601
for key in keys:
1598
1647
for _ in self._insert_record_stream(stream, random_id=False):
1650
def _get_compressor_settings(self):
1651
if self._max_bytes_to_index is None:
1652
# TODO: VersionedFiles don't know about their containing
1653
# repository, so they don't have much of an idea about their
1654
# location. So for now, this is only a global option.
1655
c = config.GlobalConfig()
1656
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1660
except ValueError as e:
1661
trace.warning('Value for '
1662
'"bzr.groupcompress.max_bytes_to_index"'
1663
' %r is not an integer'
1667
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1668
self._max_bytes_to_index = val
1669
return {'max_bytes_to_index': self._max_bytes_to_index}
1671
def _make_group_compressor(self):
1672
return GroupCompressor(self._get_compressor_settings())
1601
1674
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1675
reuse_blocks=True):
1603
1676
"""Internal core to insert a record stream into this container.
1627
1700
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1702
self._compressor = self._make_group_compressor()
1630
1703
self._unadded_refs = {}
1631
1704
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1706
bytes_len, chunks = self._compressor.flush().to_chunks()
1707
self._compressor = self._make_group_compressor()
1708
# Note: At this point we still have 1 copy of the fulltext (in
1709
# record and the var 'bytes'), and this generates 2 copies of
1710
# the compressed text (one for bytes, one in chunks)
1711
# TODO: Push 'chunks' down into the _access api, so that we don't
1712
# have to double compressed memory here
1713
# TODO: Figure out how to indicate that we would be happy to free
1714
# the fulltext content at this point. Note that sometimes we
1715
# will want it later (streaming CHK pages), but most of the
1716
# time we won't (everything else)
1717
data = b''.join(chunks)
1635
1719
index, start, length = self._access.add_raw_records(
1636
[(None, len(bytes))], bytes)[0]
1720
[(None, len(data))], data)[0]
1638
1722
for key, reads, refs in keys_to_add:
1639
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1723
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1640
1724
self._index.add_records(nodes, random_id=random_id)
1641
1725
self._unadded_refs = {}
1642
1726
del keys_to_add[:]
1656
1740
raise errors.RevisionNotPresent(record.key, self)
1658
1742
if record.key in inserted_keys:
1659
trace.note('Insert claimed random_id=True,'
1660
' but then inserted %r two times', record.key)
1743
trace.note(gettext('Insert claimed random_id=True,'
1744
' but then inserted %r two times'), record.key)
1662
1746
inserted_keys.add(record.key)
1663
1747
if reuse_blocks:
1692
1776
' the current record, we cannot be positive'
1693
1777
' that the appropriate content was inserted.'
1695
value = "%d %d %d %d" % (block_start, block_length,
1779
value = b"%d %d %d %d" % (block_start, block_length,
1696
1780
record._start, record._end)
1697
1781
nodes = [(record.key, value, (record.parents,))]
1698
1782
# TODO: Consider buffering up many nodes to be added, not
1802
1887
"""See VersionedFiles.keys."""
1803
1888
if 'evil' in debug.debug_flags:
1804
1889
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1890
sources = [self._index] + self._immediate_fallback_vfs
1807
1892
for source in sources:
1808
1893
result.update(source.keys())
1897
class _GCBuildDetails(object):
1898
"""A blob of data about the build details.
1900
This stores the minimal data, which then allows compatibility with the old
1901
api, without taking as much memory.
1904
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1905
'_delta_end', '_parents')
1908
compression_parent = None
1910
def __init__(self, parents, position_info):
1911
self._parents = parents
1912
(self._index, self._group_start, self._group_end, self._basis_end,
1913
self._delta_end) = position_info
1916
return '%s(%s, %s)' % (self.__class__.__name__,
1917
self.index_memo, self._parents)
1920
def index_memo(self):
1921
return (self._index, self._group_start, self._group_end,
1922
self._basis_end, self._delta_end)
1925
def record_details(self):
1926
return static_tuple.StaticTuple(self.method, None)
1928
def __getitem__(self, offset):
1929
"""Compatibility thunk to act like a tuple."""
1931
return self.index_memo
1933
return self.compression_parent # Always None
1935
return self._parents
1937
return self.record_details
1939
raise IndexError('offset out of range')
1812
1945
class _GCGraphIndex(object):
1813
1946
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1817
1950
inconsistency_fatal=True, track_new_keys=False):
1818
1951
"""Construct a _GCGraphIndex on a graph_index.
1820
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1953
:param graph_index: An implementation of breezy.index.GraphIndex.
1821
1954
:param is_locked: A callback, returns True if the index is locked and
1823
1956
:param parents: If True, record knits parents, if not do not record
1902
2035
if self._parents:
1903
for key, (value, node_refs) in keys.iteritems():
2036
for key, (value, node_refs) in viewitems(keys):
1904
2037
result.append((key, value, node_refs))
1906
for key, (value, node_refs) in keys.iteritems():
2039
for key, (value, node_refs) in viewitems(keys):
1907
2040
result.append((key, value))
1908
2041
records = result
1909
2042
key_dependencies = self._key_dependencies
1989
2122
:param keys: An iterable of keys.
1990
2123
:return: A dict of key:
1991
2124
(index_memo, compression_parent, parents, record_details).
1993
opaque structure to pass to read_records to extract the raw
1996
Content that this record is built upon, may be None
1998
Logical parents of this node
2000
extra information about the content which needs to be passed to
2001
Factory.parse_record
2126
* index_memo: opaque structure to pass to read_records to extract
2128
* compression_parent: Content that this record is built upon, may
2130
* parents: Logical parents of this node
2131
* record_details: extra information about the content which needs
2132
to be passed to Factory.parse_record
2003
2134
self._check_read()
2025
2155
def _node_to_position(self, node):
2026
2156
"""Convert an index value to position details."""
2027
bits = node[2].split(' ')
2157
bits = node[2].split(b' ')
2028
2158
# It would be nice not to read the entire gzip.
2029
2159
# start and stop are put into _int_cache because they are very common.
2030
2160
# They define the 'group' that an entry is in, and many groups can have
2033
2163
# each, or about 7MB. Note that it might be even more when you consider
2034
2164
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2165
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2166
# a LIFO when re-using PyInt slots, which might cause more
2037
2167
# fragmentation.
2038
2168
start = int(bits[0])
2039
2169
start = self._int_cache.setdefault(start, start)