17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
24
from ..lazy_import import lazy_import
25
lazy_import(globals(), """
30
30
graph as _mod_graph,
36
from breezy.bzr import (
42
from breezy.i18n import gettext
48
from .btree_index import BTreeBuilder
49
from ..lru_cache import LRUSizeCache
50
from ..sixish import (
56
from .versionedfile import (
37
from bzrlib.btree_index import BTreeBuilder
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
from bzrlib.versionedfile import (
59
42
AbsentContentFactory,
60
43
ChunkedContentFactory,
61
44
FulltextContentFactory,
62
VersionedFilesWithFallbacks,
65
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
66
49
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
69
54
# osutils.sha_string('')
70
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
72
57
def sort_gc_optimal(parent_map):
73
58
"""Sort and group the keys in parent_map into groupcompress order.
80
65
# groupcompress ordering is approximately reverse topological,
81
66
# properly grouped by file-id.
82
67
per_prefix_map = {}
83
for key, value in viewitems(parent_map):
84
if isinstance(key, bytes) or len(key) == 1:
68
for key, value in parent_map.iteritems():
69
if isinstance(key, str) or len(key) == 1:
94
79
for prefix in sorted(per_prefix_map):
95
present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
80
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
96
81
return present_keys
99
class DecompressCorruption(errors.BzrError):
101
_fmt = "Corruption while decompressing repository file%(orig_error)s"
103
def __init__(self, orig_error=None):
104
if orig_error is not None:
105
self.orig_error = ", %s" % (orig_error,)
108
errors.BzrError.__init__(self)
111
84
# The max zlib window size is 32kB, so if we set 'max_size' output of the
112
85
# decompressor to the requested bytes + 32kB, then we should guarantee
113
86
# num_bytes coming out.
122
95
# Group Compress Block v1 Zlib
123
GCB_HEADER = b'gcb1z\n'
96
GCB_HEADER = 'gcb1z\n'
124
97
# Group Compress Block v1 Lzma
125
GCB_LZ_HEADER = b'gcb1l\n'
98
GCB_LZ_HEADER = 'gcb1l\n'
126
99
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
128
101
def __init__(self):
129
102
# map by key? or just order in file?
130
103
self._compressor_name = None
131
self._z_content_chunks = None
104
self._z_content = None
132
105
self._z_content_decompressor = None
133
106
self._z_content_length = None
134
107
self._content_length = None
159
132
# Expand the content if required
160
133
if self._content is None:
161
134
if self._content_chunks is not None:
162
self._content = b''.join(self._content_chunks)
135
self._content = ''.join(self._content_chunks)
163
136
self._content_chunks = None
164
137
if self._content is None:
165
# We join self._z_content_chunks here, because if we are
166
# decompressing, then it is *very* likely that we have a single
168
if self._z_content_chunks is None:
138
if self._z_content is None:
169
139
raise AssertionError('No content to decompress')
170
z_content = b''.join(self._z_content_chunks)
140
if self._z_content == '':
173
142
elif self._compressor_name == 'lzma':
174
143
# We don't do partial lzma decomp yet
176
self._content = pylzma.decompress(z_content)
144
self._content = pylzma.decompress(self._z_content)
177
145
elif self._compressor_name == 'zlib':
178
146
# Start a zlib decompressor
179
147
if num_bytes * 4 > self._content_length * 3:
180
148
# If we are requesting more that 3/4ths of the content,
181
149
# just extract the whole thing in a single pass
182
150
num_bytes = self._content_length
183
self._content = zlib.decompress(z_content)
151
self._content = zlib.decompress(self._z_content)
185
153
self._z_content_decompressor = zlib.decompressobj()
186
154
# Seed the decompressor with the uncompressed bytes, so
187
155
# that the rest of the code is simplified
188
156
self._content = self._z_content_decompressor.decompress(
189
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
190
158
if not self._z_content_decompressor.unconsumed_tail:
191
159
self._z_content_decompressor = None
229
197
# At present, we have 2 integers for the compressed and uncompressed
230
198
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
231
199
# checking too far, cap the search to 14 bytes.
232
pos2 = data.index(b'\n', pos, pos + 14)
233
self._z_content_length = int(data[pos:pos2])
235
pos2 = data.index(b'\n', pos, pos + 14)
236
self._content_length = int(data[pos:pos2])
238
if len(data) != (pos + self._z_content_length):
200
pos2 = bytes.index('\n', pos, pos + 14)
201
self._z_content_length = int(bytes[pos:pos2])
203
pos2 = bytes.index('\n', pos, pos + 14)
204
self._content_length = int(bytes[pos:pos2])
206
if len(bytes) != (pos + self._z_content_length):
239
207
# XXX: Define some GCCorrupt error ?
240
208
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
241
(len(data), pos, self._z_content_length))
242
self._z_content_chunks = (data[pos:],)
245
def _z_content(self):
246
"""Return z_content_chunks as a simple string.
248
Meant only to be used by the test suite.
250
if self._z_content_chunks is not None:
251
return b''.join(self._z_content_chunks)
209
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
255
213
def from_bytes(cls, bytes):
258
if header not in cls.GCB_KNOWN_HEADERS:
215
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
259
216
raise ValueError('bytes did not start with any of %r'
260
217
% (cls.GCB_KNOWN_HEADERS,))
261
if header == cls.GCB_HEADER:
218
# XXX: why not testing the whole header ?
262
220
out._compressor_name = 'zlib'
263
elif header == cls.GCB_LZ_HEADER:
221
elif bytes[4] == 'l':
264
222
out._compressor_name = 'lzma'
266
raise ValueError('unknown compressor: %r' % (header,))
224
raise ValueError('unknown compressor: %r' % (bytes,))
267
225
out._parse_bytes(bytes, 6)
275
233
:return: The bytes for the content
277
235
if start == end == 0:
279
237
self._ensure_content(end)
280
238
# The bytes are 'f' or 'd' for the type, then a variable-length
281
239
# base128 integer for the content size, then the actual content
282
240
# We know that the variable-length integer won't be longer than 5
283
241
# bytes (it takes 5 bytes to encode 2^32)
284
c = self._content[start:start + 1]
242
c = self._content[start]
286
244
type = 'fulltext'
289
247
raise ValueError('Unknown content control code: %s'
295
253
if end != content_start + content_len:
296
254
raise ValueError('end != len according to field header'
297
255
' %s != %s' % (end, content_start + content_len))
299
return self._content[content_start:end]
300
# Must be type delta as checked above
301
return apply_delta_to_source(self._content, content_start, end)
257
bytes = self._content[content_start:end]
259
bytes = apply_delta_to_source(self._content, content_start, end)
303
262
def set_chunked_content(self, content_chunks, length):
304
263
"""Set the content of this block to the given chunks."""
310
269
self._content_length = length
311
270
self._content_chunks = content_chunks
312
271
self._content = None
313
self._z_content_chunks = None
272
self._z_content = None
315
274
def set_content(self, content):
316
275
"""Set the content of this block."""
317
276
self._content_length = len(content)
318
277
self._content = content
319
self._z_content_chunks = None
321
def _create_z_content_from_chunks(self, chunks):
278
self._z_content = None
280
def _create_z_content_using_lzma(self):
281
if self._content_chunks is not None:
282
self._content = ''.join(self._content_chunks)
283
self._content_chunks = None
284
if self._content is None:
285
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
289
def _create_z_content_from_chunks(self):
322
290
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
323
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
324
# (measured peak is maybe 30MB over the above...)
325
compressed_chunks = list(map(compressor.compress, chunks))
291
compressed_chunks = map(compressor.compress, self._content_chunks)
326
292
compressed_chunks.append(compressor.flush())
327
# Ignore empty chunks
328
self._z_content_chunks = [c for c in compressed_chunks if c]
329
self._z_content_length = sum(map(len, self._z_content_chunks))
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
331
296
def _create_z_content(self):
332
if self._z_content_chunks is not None:
297
if self._z_content is not None:
300
self._create_z_content_using_lzma()
334
302
if self._content_chunks is not None:
335
chunks = self._content_chunks
337
chunks = (self._content,)
338
self._create_z_content_from_chunks(chunks)
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
341
"""Create the byte stream as a series of 'chunks'"""
309
"""Encode the information into a byte stream."""
342
310
self._create_z_content()
343
header = self.GCB_HEADER
344
chunks = [b'%s%d\n%d\n'
345
% (header, self._z_content_length, self._content_length),
312
header = self.GCB_LZ_HEADER
314
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
347
chunks.extend(self._z_content_chunks)
348
total_len = sum(map(len, chunks))
349
return total_len, chunks
352
"""Encode the information into a byte stream."""
353
total_len, chunks = self.to_chunks()
354
return b''.join(chunks)
319
return ''.join(chunks)
356
321
def _dump(self, include_text=False):
357
322
"""Take this block, and spit out a human-readable structure.
369
334
while pos < self._content_length:
370
kind = self._content[pos:pos + 1]
335
kind = self._content[pos]
372
if kind not in (b'f', b'd'):
337
if kind not in ('f', 'd'):
373
338
raise ValueError('invalid kind character: %r' % (kind,))
374
339
content_len, len_len = decode_base128_int(
375
340
self._content[pos:pos + 5])
377
342
if content_len + pos > self._content_length:
378
343
raise ValueError('invalid content_len %d for record @ pos %d'
379
344
% (content_len, pos - len_len - 1))
380
if kind == b'f': # Fulltext
345
if kind == 'f': # Fulltext
382
347
text = self._content[pos:pos+content_len]
383
result.append((b'f', content_len, text))
348
result.append(('f', content_len, text))
385
result.append((b'f', content_len))
386
elif kind == b'd': # Delta
350
result.append(('f', content_len))
351
elif kind == 'd': # Delta
387
352
delta_content = self._content[pos:pos+content_len]
389
354
# The first entry in a delta is the decompressed length
390
355
decomp_len, delta_pos = decode_base128_int(delta_content)
391
result.append((b'd', content_len, decomp_len, delta_info))
356
result.append(('d', content_len, decomp_len, delta_info))
393
358
while delta_pos < content_len:
394
c = indexbytes(delta_content, delta_pos)
359
c = ord(delta_content[delta_pos])
396
361
if c & 0x80: # Copy
401
366
text = self._content[offset:offset+length]
402
delta_info.append((b'c', offset, length, text))
367
delta_info.append(('c', offset, length, text))
404
delta_info.append((b'c', offset, length))
369
delta_info.append(('c', offset, length))
405
370
measured_len += length
408
373
txt = delta_content[delta_pos:delta_pos+c]
411
delta_info.append((b'i', c, txt))
376
delta_info.append(('i', c, txt))
412
377
measured_len += c
414
379
if delta_pos != content_len:
464
429
# wire bytes, something...
465
430
return self._manager._wire_bytes()
468
433
if storage_kind in ('fulltext', 'chunked'):
469
434
if self._bytes is None:
470
435
# Grab and cache the raw bytes for this entry
471
436
# and break the ref-cycle with _manager since we don't need it
474
self._manager._prepare_for_extract()
475
except zlib.error as value:
476
raise DecompressCorruption("zlib: " + str(value))
438
self._manager._prepare_for_extract()
477
439
block = self._manager._block
478
440
self._bytes = block.extract(self.key, self._start, self._end)
479
441
# There are code paths that first extract as fulltext, and then
498
460
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
499
461
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
501
def __init__(self, block, get_compressor_settings=None):
463
def __init__(self, block):
502
464
self._block = block
503
465
# We need to preserve the ordering
504
466
self._factories = []
505
467
self._last_byte = 0
506
self._get_settings = get_compressor_settings
507
self._compressor_settings = None
509
def _get_compressor_settings(self):
510
if self._compressor_settings is not None:
511
return self._compressor_settings
513
if self._get_settings is not None:
514
settings = self._get_settings()
516
vf = GroupCompressVersionedFiles
517
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
518
self._compressor_settings = settings
519
return self._compressor_settings
521
469
def add_factory(self, key, parents, start, end):
522
470
if not self._factories:
555
503
new_block.set_content(self._block._content[:last_byte])
556
504
self._block = new_block
558
def _make_group_compressor(self):
559
return GroupCompressor(self._get_compressor_settings())
561
506
def _rebuild_block(self):
562
507
"""Create a new GroupCompressBlock with only the referenced texts."""
563
compressor = self._make_group_compressor()
508
compressor = GroupCompressor()
564
509
tstart = time.time()
565
510
old_length = self._block._content_length
578
523
# block? It seems hard to come up with a method that it would
579
524
# expand, since we do full compression again. Perhaps based on a
580
525
# request that ends up poorly ordered?
581
# TODO: If the content would have expanded, then we would want to
582
# handle a case where we need to split the block.
583
# Now that we have a user-tweakable option
584
# (max_bytes_to_index), it is possible that one person set it
585
# to a very low value, causing poor compression.
586
526
delta = time.time() - tstart
587
527
self._block = new_block
588
528
trace.mutter('creating new compressed block on-the-fly in %.3fs'
722
662
# 1 line for end byte
723
663
header_lines = []
724
664
for factory in self._factories:
725
key_bytes = b'\x00'.join(factory.key)
665
key_bytes = '\x00'.join(factory.key)
726
666
parents = factory.parents
727
667
if parents is None:
728
parent_bytes = b'None:'
668
parent_bytes = 'None:'
730
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
731
record_header = b'%s\n%s\n%d\n%d\n' % (
670
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
671
record_header = '%s\n%s\n%d\n%d\n' % (
732
672
key_bytes, parent_bytes, factory._start, factory._end)
733
673
header_lines.append(record_header)
734
674
# TODO: Can we break the refcycle at this point and set
735
675
# factory._manager = None?
736
header_bytes = b''.join(header_lines)
676
header_bytes = ''.join(header_lines)
738
678
header_bytes_len = len(header_bytes)
739
679
z_header_bytes = zlib.compress(header_bytes)
741
681
z_header_bytes_len = len(z_header_bytes)
742
block_bytes_len, block_chunks = self._block.to_chunks()
743
lines.append(b'%d\n%d\n%d\n' % (
744
z_header_bytes_len, header_bytes_len, block_bytes_len))
682
block_bytes = self._block.to_bytes()
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
745
685
lines.append(z_header_bytes)
746
lines.extend(block_chunks)
747
del z_header_bytes, block_chunks
748
# TODO: This is a point where we will double the memory consumption. To
749
# avoid this, we probably have to switch to a 'chunked' api
750
return b''.join(lines)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
688
return ''.join(lines)
753
691
def from_bytes(cls, bytes):
754
692
# TODO: This does extra string copying, probably better to do it a
755
# different way. At a minimum this creates 2 copies of the
757
694
(storage_kind, z_header_len, header_len,
758
block_len, rest) = bytes.split(b'\n', 4)
695
block_len, rest) = bytes.split('\n', 4)
760
if storage_kind != b'groupcompress-block':
697
if storage_kind != 'groupcompress-block':
761
698
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
762
699
z_header_len = int(z_header_len)
763
700
if len(rest) < z_header_len:
786
723
block = GroupCompressBlock.from_bytes(block_bytes)
788
725
result = cls(block)
789
for start in range(0, len(header_lines), 4):
726
for start in xrange(0, len(header_lines), 4):
791
key = tuple(header_lines[start].split(b'\x00'))
728
key = tuple(header_lines[start].split('\x00'))
792
729
parents_line = header_lines[start+1]
793
if parents_line == b'None:':
730
if parents_line == 'None:':
796
parents = tuple([tuple(segment.split(b'\x00'))
797
for segment in parents_line.split(b'\t')
733
parents = tuple([tuple(segment.split('\x00'))
734
for segment in parents_line.split('\t')
799
736
start_offset = int(header_lines[start+2])
800
737
end_offset = int(header_lines[start+3])
820
757
self.labels_deltas = {}
821
758
self._delta_index = None # Set by the children
822
759
self._block = GroupCompressBlock()
826
self._settings = settings
828
761
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
829
762
"""Compress lines with label key.
831
764
:param key: A key tuple. It is stored in the output
832
765
for identification of the text during decompression. If the last
833
element is b'None' it is replaced with the sha1 of the text -
766
element is 'None' it is replaced with the sha1 of the text -
834
767
e.g. sha1:xxxxxxx.
835
768
:param bytes: The bytes to be compressed
836
769
:param expected_sha: If non-None, the sha the lines are believed to
859
792
if sha1 == nostore_sha:
860
793
raise errors.ExistingContent()
861
794
if key[-1] is None:
862
key = key[:-1] + (b'sha1:' + sha1,)
795
key = key[:-1] + ('sha1:' + sha1,)
864
797
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
865
798
return sha1, start, end, type
892
825
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
893
826
delta_chunks = self.chunks[start_chunk:end_chunk]
894
stored_bytes = b''.join(delta_chunks)
895
kind = stored_bytes[:1]
827
stored_bytes = ''.join(delta_chunks)
828
if stored_bytes[0] == 'f':
897
829
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
898
830
data_len = fulltext_len + 1 + offset
899
831
if data_len != len(stored_bytes):
900
832
raise ValueError('Index claimed fulltext len, but stored bytes'
901
833
' claim %s != %s'
902
834
% (len(stored_bytes), data_len))
903
data = stored_bytes[offset + 1:]
835
bytes = stored_bytes[offset + 1:]
906
raise ValueError('Unknown content kind, bytes claim %s' % kind)
907
837
# XXX: This is inefficient at best
908
source = b''.join(self.chunks[:start_chunk])
838
source = ''.join(self.chunks[:start_chunk])
839
if stored_bytes[0] != 'd':
840
raise ValueError('Unknown content kind, bytes claim %s'
841
% (stored_bytes[0],))
909
842
delta_len, offset = decode_base128_int(stored_bytes[1:10])
910
843
data_len = delta_len + 1 + offset
911
844
if data_len != len(stored_bytes):
912
845
raise ValueError('Index claimed delta len, but stored bytes'
913
846
' claim %s != %s'
914
847
% (len(stored_bytes), data_len))
915
data = apply_delta(source, stored_bytes[offset + 1:])
916
data_sha1 = osutils.sha_string(data)
917
return data, data_sha1
848
bytes = apply_delta(source, stored_bytes[offset + 1:])
849
bytes_sha1 = osutils.sha_string(bytes)
850
return bytes, bytes_sha1
920
853
"""Finish this group, creating a formatted stream.
922
855
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
924
865
self._block.set_chunked_content(self.chunks, self.endpoint)
925
866
self.chunks = None
926
867
self._delta_index = None
945
886
class PythonGroupCompressor(_CommonGroupCompressor):
947
def __init__(self, settings=None):
948
889
"""Create a GroupCompressor.
950
891
Used only if the pyrex version is not available.
952
super(PythonGroupCompressor, self).__init__(settings)
893
super(PythonGroupCompressor, self).__init__()
953
894
self._delta_index = LinesDeltaIndex([])
954
895
# The actual content is managed by LinesDeltaIndex
955
896
self.chunks = self._delta_index.lines
964
905
if delta_length > max_delta_size:
965
906
# The delta is longer than the fulltext, insert a fulltext
966
907
type = 'fulltext'
967
out_lines = [b'f', encode_base128_int(input_len)]
908
out_lines = ['f', encode_base128_int(input_len)]
968
909
out_lines.extend(new_lines)
969
910
index_lines = [False, False]
970
911
index_lines.extend([True] * len(new_lines))
972
913
# this is a worthy delta, output it
975
916
# Update the delta_length to include those two encoded integers
976
917
out_lines[1] = encode_base128_int(delta_length)
977
918
# Before insertion
993
934
It contains code very similar to SequenceMatcher because of having a similar
994
935
task. However some key differences apply:
996
* there is no junk, we want a minimal edit not a human readable diff.
997
* we don't filter very common lines (because we don't know where a good
998
range will start, and after the first text we want to be emitting minmal
1000
* we chain the left side, not the right side
1001
* we incrementally update the adjacency matrix as new lines are provided.
1002
* we look for matches in all of the left side, so the routine which does
1003
the analagous task of find_longest_match does not need to filter on the
936
- there is no junk, we want a minimal edit not a human readable diff.
937
- we don't filter very common lines (because we don't know where a good
938
range will start, and after the first text we want to be emitting minmal
940
- we chain the left side, not the right side
941
- we incrementally update the adjacency matrix as new lines are provided.
942
- we look for matches in all of the left side, so the routine which does
943
the analagous task of find_longest_match does not need to filter on the
1007
def __init__(self, settings=None):
1008
super(PyrexGroupCompressor, self).__init__(settings)
1009
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1010
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1012
951
def _compress(self, key, bytes, max_delta_size, soft=False):
1013
952
"""see _CommonGroupCompressor._compress"""
1031
970
enc_length = encode_base128_int(len(bytes))
1032
971
len_mini_header = 1 + len(enc_length)
1033
972
self._delta_index.add_source(bytes, len_mini_header)
1034
new_chunks = [b'f', enc_length, bytes]
973
new_chunks = ['f', enc_length, bytes]
1037
976
enc_length = encode_base128_int(len(delta))
1038
977
len_mini_header = 1 + len(enc_length)
1039
new_chunks = [b'd', enc_length, delta]
978
new_chunks = ['d', enc_length, delta]
1040
979
self._delta_index.add_delta_source(delta, len_mini_header)
1041
980
# Before insertion
1042
981
start = self.endpoint
1088
1027
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1089
1028
add_callback=graph_index.add_nodes,
1090
1029
inconsistency_fatal=inconsistency_fatal)
1091
access = pack_repo._DirectPackAccess({})
1030
access = knit._DirectPackAccess({})
1092
1031
access.set_writer(writer, graph_index, (transport, 'newpack'))
1093
1032
result = GroupCompressVersionedFiles(index, access, delta)
1094
1033
result.stream = stream
1105
1044
class _BatchingBlockFetcher(object):
1106
1045
"""Fetch group compress blocks in batches.
1108
1047
:ivar total_bytes: int of expected number of bytes needed to fetch the
1109
1048
currently pending batch.
1112
def __init__(self, gcvf, locations, get_compressor_settings=None):
1051
def __init__(self, gcvf, locations):
1113
1052
self.gcvf = gcvf
1114
1053
self.locations = locations
1198
1136
memos_to_get_stack.pop()
1200
1138
block = self.batch_memos[read_memo]
1201
self.manager = _LazyGroupContentManager(block,
1202
get_compressor_settings=self._get_compressor_settings)
1139
self.manager = _LazyGroupContentManager(block)
1203
1140
self.last_read_memo = read_memo
1204
1141
start, end = index_memo[3:5]
1205
1142
self.manager.add_factory(key, parents, start, end)
1212
1149
self.total_bytes = 0
1215
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1152
class GroupCompressVersionedFiles(VersionedFiles):
1216
1153
"""A group-compress based VersionedFiles implementation."""
1218
# This controls how the GroupCompress DeltaIndex works. Basically, we
1219
# compute hash pointers into the source blocks (so hash(text) => text).
1220
# However each of these references costs some memory in trade against a
1221
# more accurate match result. For very large files, they either are
1222
# pre-compressed and change in bulk whenever they change, or change in just
1223
# local blocks. Either way, 'improved resolution' is not very helpful,
1224
# versus running out of memory trying to track everything. The default max
1225
# gives 100% sampling of a 1MB file.
1226
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1227
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1228
_DEFAULT_MAX_BYTES_TO_INDEX}
1230
def __init__(self, index, access, delta=True, _unadded_refs=None,
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1232
1156
"""Create a GroupCompressVersionedFiles object.
1234
1158
:param index: The index object storing access and graph data.
1235
1159
:param access: The access object storing raw data.
1236
1160
:param delta: Whether to delta compress or just entropy compress.
1237
1161
:param _unadded_refs: private parameter, don't use.
1238
:param _group_cache: private parameter, don't use.
1240
1163
self._index = index
1241
1164
self._access = access
1243
1166
if _unadded_refs is None:
1244
1167
_unadded_refs = {}
1245
1168
self._unadded_refs = _unadded_refs
1246
if _group_cache is None:
1247
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1248
self._group_cache = _group_cache
1249
self._immediate_fallback_vfs = []
1250
self._max_bytes_to_index = None
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1252
1172
def without_fallbacks(self):
1253
1173
"""Return a clone of this object without any fallbacks configured."""
1254
1174
return GroupCompressVersionedFiles(self._index, self._access,
1255
self._delta, _unadded_refs=dict(self._unadded_refs),
1256
_group_cache=self._group_cache)
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1258
1177
def add_lines(self, key, parents, lines, parent_texts=None,
1259
1178
left_matching_blocks=None, nostore_sha=None, random_id=False,
1263
1182
:param key: The key tuple of the text to add.
1264
1183
:param parents: The parents key tuples of the text to add.
1265
1184
:param lines: A list of lines. Each line must be a bytestring. And all
1266
of them except the last must be terminated with \\n and contain no
1267
other \\n's. The last line may either contain no \\n's or a single
1268
terminating \\n. If the lines list does meet this constraint the
1269
add routine may error or may succeed - but you will be unable to
1270
read the data back accurately. (Checking the lines have been split
1185
of them except the last must be terminated with \n and contain no
1186
other \n's. The last line may either contain no \n's or a single
1187
terminating \n. If the lines list does meet this constraint the add
1188
routine may error or may succeed - but you will be unable to read
1189
the data back accurately. (Checking the lines have been split
1271
1190
correctly is expensive and extremely unlikely to catch bugs so it
1272
1191
is not done at runtime unless check_content is True.)
1273
1192
:param parent_texts: An optional dictionary containing the opaque
1305
1224
nostore_sha=nostore_sha))[0]
1306
1225
return sha1, length, None
1227
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1228
"""See VersionedFiles._add_text()."""
1229
self._index._check_write_ok()
1230
self._check_add(key, None, random_id, check_content=False)
1231
if text.__class__ is not str:
1232
raise errors.BzrBadParameterUnicode("text")
1234
# The caller might pass None if there is no graph data, but kndx
1235
# indexes can't directly store that, so we give them
1236
# an empty tuple instead.
1238
# double handling for now. Make it work until then.
1240
record = FulltextContentFactory(key, parents, None, text)
1241
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1242
nostore_sha=nostore_sha))[0]
1243
return sha1, length, None
1308
1245
def add_fallback_versioned_files(self, a_versioned_files):
1309
1246
"""Add a source of texts for texts not present in this knit.
1311
1248
:param a_versioned_files: A VersionedFiles object.
1313
self._immediate_fallback_vfs.append(a_versioned_files)
1250
self._fallback_vfs.append(a_versioned_files)
1315
1252
def annotate(self, key):
1316
1253
"""See VersionedFiles.annotate."""
1350
1287
self._check_lines_not_unicode(lines)
1351
1288
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1353
1306
def get_parent_map(self, keys):
1354
1307
"""Get a map of the graph parents of keys.
1494
1447
The returned objects should be in the order defined by 'ordering',
1495
1448
which can weave between different sources.
1497
1449
:param ordering: Must be one of 'topological' or 'groupcompress'
1498
1450
:return: List of [(source, [keys])] tuples, such that all keys are in
1499
1451
the defined order, regardless of source.
1501
1453
if ordering == 'topological':
1502
present_keys = tsort.topo_sort(parent_map)
1454
present_keys = topo_sort(parent_map)
1504
1456
# ordering == 'groupcompress'
1505
1457
# XXX: This only optimizes for the target ordering. We may need
1541
1493
# This is the group the bytes are stored in, followed by the
1542
1494
# location in the group
1543
1495
return locations[key][0]
1496
present_keys = sorted(locations.iterkeys(), key=get_group)
1544
1497
# We don't have an ordering for keys in the in-memory object, but
1545
1498
# lets process the in-memory ones first.
1546
present_keys = list(unadded_keys)
1547
present_keys.extend(sorted(locations, key=get_group))
1499
present_keys = list(unadded_keys) + present_keys
1548
1500
# Now grab all of the ones from other sources
1549
1501
source_keys = [(self, present_keys)]
1550
1502
source_keys.extend(source_result)
1594
1546
# - we encounter an unadded ref, or
1595
1547
# - we run out of keys, or
1596
1548
# - the total bytes to retrieve for this batch > BATCH_SIZE
1597
batcher = _BatchingBlockFetcher(self, locations,
1598
get_compressor_settings=self._get_compressor_settings)
1549
batcher = _BatchingBlockFetcher(self, locations)
1599
1550
for source, keys in source_keys:
1600
1551
if source is self:
1601
1552
for key in keys:
1647
1598
for _ in self._insert_record_stream(stream, random_id=False):
1650
def _get_compressor_settings(self):
1651
if self._max_bytes_to_index is None:
1652
# TODO: VersionedFiles don't know about their containing
1653
# repository, so they don't have much of an idea about their
1654
# location. So for now, this is only a global option.
1655
c = config.GlobalConfig()
1656
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1660
except ValueError as e:
1661
trace.warning('Value for '
1662
'"bzr.groupcompress.max_bytes_to_index"'
1663
' %r is not an integer'
1667
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1668
self._max_bytes_to_index = val
1669
return {'max_bytes_to_index': self._max_bytes_to_index}
1671
def _make_group_compressor(self):
1672
return GroupCompressor(self._get_compressor_settings())
1674
1601
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1675
1602
reuse_blocks=True):
1676
1603
"""Internal core to insert a record stream into this container.
1700
1627
# This will go up to fulltexts for gc to gc fetching, which isn't
1702
self._compressor = self._make_group_compressor()
1629
self._compressor = GroupCompressor()
1703
1630
self._unadded_refs = {}
1704
1631
keys_to_add = []
1706
bytes_len, chunks = self._compressor.flush().to_chunks()
1707
self._compressor = self._make_group_compressor()
1708
# Note: At this point we still have 1 copy of the fulltext (in
1709
# record and the var 'bytes'), and this generates 2 copies of
1710
# the compressed text (one for bytes, one in chunks)
1711
# TODO: Push 'chunks' down into the _access api, so that we don't
1712
# have to double compressed memory here
1713
# TODO: Figure out how to indicate that we would be happy to free
1714
# the fulltext content at this point. Note that sometimes we
1715
# will want it later (streaming CHK pages), but most of the
1716
# time we won't (everything else)
1717
data = b''.join(chunks)
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1719
1635
index, start, length = self._access.add_raw_records(
1720
[(None, len(data))], data)[0]
1636
[(None, len(bytes))], bytes)[0]
1722
1638
for key, reads, refs in keys_to_add:
1723
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1639
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1724
1640
self._index.add_records(nodes, random_id=random_id)
1725
1641
self._unadded_refs = {}
1726
1642
del keys_to_add[:]
1740
1656
raise errors.RevisionNotPresent(record.key, self)
1742
1658
if record.key in inserted_keys:
1743
trace.note(gettext('Insert claimed random_id=True,'
1744
' but then inserted %r two times'), record.key)
1659
trace.note('Insert claimed random_id=True,'
1660
' but then inserted %r two times', record.key)
1746
1662
inserted_keys.add(record.key)
1747
1663
if reuse_blocks:
1776
1692
' the current record, we cannot be positive'
1777
1693
' that the appropriate content was inserted.'
1779
value = b"%d %d %d %d" % (block_start, block_length,
1695
value = "%d %d %d %d" % (block_start, block_length,
1780
1696
record._start, record._end)
1781
1697
nodes = [(record.key, value, (record.parents,))]
1782
1698
# TODO: Consider buffering up many nodes to be added, not
1887
1802
"""See VersionedFiles.keys."""
1888
1803
if 'evil' in debug.debug_flags:
1889
1804
trace.mutter_callsite(2, "keys scales with size of history")
1890
sources = [self._index] + self._immediate_fallback_vfs
1805
sources = [self._index] + self._fallback_vfs
1892
1807
for source in sources:
1893
1808
result.update(source.keys())
1897
class _GCBuildDetails(object):
1898
"""A blob of data about the build details.
1900
This stores the minimal data, which then allows compatibility with the old
1901
api, without taking as much memory.
1904
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1905
'_delta_end', '_parents')
1908
compression_parent = None
1910
def __init__(self, parents, position_info):
1911
self._parents = parents
1912
(self._index, self._group_start, self._group_end, self._basis_end,
1913
self._delta_end) = position_info
1916
return '%s(%s, %s)' % (self.__class__.__name__,
1917
self.index_memo, self._parents)
1920
def index_memo(self):
1921
return (self._index, self._group_start, self._group_end,
1922
self._basis_end, self._delta_end)
1925
def record_details(self):
1926
return static_tuple.StaticTuple(self.method, None)
1928
def __getitem__(self, offset):
1929
"""Compatibility thunk to act like a tuple."""
1931
return self.index_memo
1933
return self.compression_parent # Always None
1935
return self._parents
1937
return self.record_details
1939
raise IndexError('offset out of range')
1945
1812
class _GCGraphIndex(object):
1946
1813
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1950
1817
inconsistency_fatal=True, track_new_keys=False):
1951
1818
"""Construct a _GCGraphIndex on a graph_index.
1953
:param graph_index: An implementation of breezy.index.GraphIndex.
1820
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1954
1821
:param is_locked: A callback, returns True if the index is locked and
1956
1823
:param parents: If True, record knits parents, if not do not record
2035
1902
if self._parents:
2036
for key, (value, node_refs) in viewitems(keys):
1903
for key, (value, node_refs) in keys.iteritems():
2037
1904
result.append((key, value, node_refs))
2039
for key, (value, node_refs) in viewitems(keys):
1906
for key, (value, node_refs) in keys.iteritems():
2040
1907
result.append((key, value))
2041
1908
records = result
2042
1909
key_dependencies = self._key_dependencies
2122
1989
:param keys: An iterable of keys.
2123
1990
:return: A dict of key:
2124
1991
(index_memo, compression_parent, parents, record_details).
2126
* index_memo: opaque structure to pass to read_records to extract
2128
* compression_parent: Content that this record is built upon, may
2130
* parents: Logical parents of this node
2131
* record_details: extra information about the content which needs
2132
to be passed to Factory.parse_record
1993
opaque structure to pass to read_records to extract the raw
1996
Content that this record is built upon, may be None
1998
Logical parents of this node
2000
extra information about the content which needs to be passed to
2001
Factory.parse_record
2134
2003
self._check_read()
2155
2025
def _node_to_position(self, node):
2156
2026
"""Convert an index value to position details."""
2157
bits = node[2].split(b' ')
2027
bits = node[2].split(' ')
2158
2028
# It would be nice not to read the entire gzip.
2159
2029
# start and stop are put into _int_cache because they are very common.
2160
2030
# They define the 'group' that an entry is in, and many groups can have
2163
2033
# each, or about 7MB. Note that it might be even more when you consider
2164
2034
# how PyInt is allocated in separate slabs. And you can't return a slab
2165
2035
# to the OS if even 1 int on it is in use. Note though that Python uses
2166
# a LIFO when re-using PyInt slots, which might cause more
2036
# a LIFO when re-using PyInt slots, which probably causes more
2167
2037
# fragmentation.
2168
2038
start = int(bits[0])
2169
2039
start = self._int_cache.setdefault(start, start)