17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
22
from ..lazy_import import lazy_import
24
from bzrlib.lazy_import import lazy_import
23
25
lazy_import(globals(), """
33
from breezy.bzr import (
39
from breezy.i18n import gettext
39
from bzrlib.repofmt import pack_repo
40
from bzrlib.i18n import gettext
45
from .btree_index import BTreeBuilder
46
from ..lru_cache import LRUSizeCache
47
from .versionedfile import (
43
from bzrlib.btree_index import BTreeBuilder
44
from bzrlib.lru_cache import LRUSizeCache
45
from bzrlib.versionedfile import (
50
48
AbsentContentFactory,
51
49
ChunkedContentFactory,
53
50
FulltextContentFactory,
54
51
VersionedFilesWithFallbacks,
55
UnavailableRepresentation,
58
54
# Minimum number of uncompressed bytes to try fetch at once when retrieving
59
55
# groupcompress blocks.
62
# osutils.sha_string(b'')
63
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
58
# osutils.sha_string('')
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
66
61
def sort_gc_optimal(parent_map):
67
62
"""Sort and group the keys in parent_map into groupcompress order.
90
85
return present_keys
93
class DecompressCorruption(errors.BzrError):
95
_fmt = "Corruption while decompressing repository file%(orig_error)s"
97
def __init__(self, orig_error=None):
98
if orig_error is not None:
99
self.orig_error = ", %s" % (orig_error,)
102
errors.BzrError.__init__(self)
105
88
# The max zlib window size is 32kB, so if we set 'max_size' output of the
106
89
# decompressor to the requested bytes + 32kB, then we should guarantee
107
90
# num_bytes coming out.
108
_ZLIB_DECOMP_WINDOW = 32 * 1024
91
_ZLIB_DECOMP_WINDOW = 32*1024
111
93
class GroupCompressBlock(object):
112
94
"""An object which maintains the internal structure of the compressed data.
224
206
# At present, we have 2 integers for the compressed and uncompressed
225
207
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
226
208
# checking too far, cap the search to 14 bytes.
227
pos2 = data.index(b'\n', pos, pos + 14)
228
self._z_content_length = int(data[pos:pos2])
230
pos2 = data.index(b'\n', pos, pos + 14)
231
self._content_length = int(data[pos:pos2])
233
if len(data) != (pos + self._z_content_length):
209
pos2 = bytes.index('\n', pos, pos + 14)
210
self._z_content_length = int(bytes[pos:pos2])
212
pos2 = bytes.index('\n', pos, pos + 14)
213
self._content_length = int(bytes[pos:pos2])
215
if len(bytes) != (pos + self._z_content_length):
234
216
# XXX: Define some GCCorrupt error ?
235
217
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
236
(len(data), pos, self._z_content_length))
237
self._z_content_chunks = (data[pos:],)
218
(len(bytes), pos, self._z_content_length))
219
self._z_content_chunks = (bytes[pos:],)
240
222
def _z_content(self):
243
225
Meant only to be used by the test suite.
245
227
if self._z_content_chunks is not None:
246
return b''.join(self._z_content_chunks)
228
return ''.join(self._z_content_chunks)
250
232
def from_bytes(cls, bytes):
253
if header not in cls.GCB_KNOWN_HEADERS:
234
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
254
235
raise ValueError('bytes did not start with any of %r'
255
236
% (cls.GCB_KNOWN_HEADERS,))
256
if header == cls.GCB_HEADER:
237
# XXX: why not testing the whole header ?
257
239
out._compressor_name = 'zlib'
258
elif header == cls.GCB_LZ_HEADER:
240
elif bytes[4] == 'l':
259
241
out._compressor_name = 'lzma'
261
raise ValueError('unknown compressor: %r' % (header,))
243
raise ValueError('unknown compressor: %r' % (bytes,))
262
244
out._parse_bytes(bytes, 6)
270
252
:return: The bytes for the content
272
254
if start == end == 0:
274
256
self._ensure_content(end)
275
257
# The bytes are 'f' or 'd' for the type, then a variable-length
276
258
# base128 integer for the content size, then the actual content
277
259
# We know that the variable-length integer won't be longer than 5
278
260
# bytes (it takes 5 bytes to encode 2^32)
279
c = self._content[start:start + 1]
261
c = self._content[start]
281
263
type = 'fulltext'
284
266
raise ValueError('Unknown content control code: %s'
287
269
content_len, len_len = decode_base128_int(
288
self._content[start + 1:start + 6])
270
self._content[start + 1:start + 6])
289
271
content_start = start + 1 + len_len
290
272
if end != content_start + content_len:
291
273
raise ValueError('end != len according to field header'
292
' %s != %s' % (end, content_start + content_len))
294
return [self._content[content_start:end]]
295
# Must be type delta as checked above
296
return [apply_delta_to_source(self._content, content_start, end)]
274
' %s != %s' % (end, content_start + content_len))
276
bytes = self._content[content_start:end]
278
bytes = apply_delta_to_source(self._content, content_start, end)
298
281
def set_chunked_content(self, content_chunks, length):
299
282
"""Set the content of this block to the given chunks."""
364
347
while pos < self._content_length:
365
kind = self._content[pos:pos + 1]
348
kind = self._content[pos]
367
if kind not in (b'f', b'd'):
350
if kind not in ('f', 'd'):
368
351
raise ValueError('invalid kind character: %r' % (kind,))
369
352
content_len, len_len = decode_base128_int(
370
self._content[pos:pos + 5])
353
self._content[pos:pos + 5])
372
355
if content_len + pos > self._content_length:
373
356
raise ValueError('invalid content_len %d for record @ pos %d'
374
357
% (content_len, pos - len_len - 1))
375
if kind == b'f': # Fulltext
358
if kind == 'f': # Fulltext
377
text = self._content[pos:pos + content_len]
378
result.append((b'f', content_len, text))
360
text = self._content[pos:pos+content_len]
361
result.append(('f', content_len, text))
380
result.append((b'f', content_len))
381
elif kind == b'd': # Delta
382
delta_content = self._content[pos:pos + content_len]
363
result.append(('f', content_len))
364
elif kind == 'd': # Delta
365
delta_content = self._content[pos:pos+content_len]
384
367
# The first entry in a delta is the decompressed length
385
368
decomp_len, delta_pos = decode_base128_int(delta_content)
386
result.append((b'd', content_len, decomp_len, delta_info))
369
result.append(('d', content_len, decomp_len, delta_info))
388
371
while delta_pos < content_len:
389
c = delta_content[delta_pos]
372
c = ord(delta_content[delta_pos])
393
376
delta_pos) = decode_copy_instruction(delta_content, c,
396
text = self._content[offset:offset + length]
397
delta_info.append((b'c', offset, length, text))
379
text = self._content[offset:offset+length]
380
delta_info.append(('c', offset, length, text))
399
delta_info.append((b'c', offset, length))
382
delta_info.append(('c', offset, length))
400
383
measured_len += length
403
txt = delta_content[delta_pos:delta_pos + c]
386
txt = delta_content[delta_pos:delta_pos+c]
406
delta_info.append((b'i', c, txt))
389
delta_info.append(('i', c, txt))
407
390
measured_len += c
409
392
if delta_pos != content_len:
453
435
def __repr__(self):
454
436
return '%s(%s, first=%s)' % (self.__class__.__name__,
455
self.key, self._first)
457
def _extract_bytes(self):
458
# Grab and cache the raw bytes for this entry
459
# and break the ref-cycle with _manager since we don't need it
462
self._manager._prepare_for_extract()
463
except zlib.error as value:
464
raise DecompressCorruption("zlib: " + str(value))
465
block = self._manager._block
466
self._chunks = block.extract(self.key, self._start, self._end)
467
# There are code paths that first extract as fulltext, and then
468
# extract as storage_kind (smart fetch). So we don't break the
469
# refcycle here, but instead in manager.get_record_stream()
437
self.key, self._first)
471
439
def get_bytes_as(self, storage_kind):
472
440
if storage_kind == self.storage_kind:
474
442
# wire bytes, something...
475
443
return self._manager._wire_bytes()
478
if storage_kind in ('fulltext', 'chunked', 'lines'):
479
if self._chunks is None:
480
self._extract_bytes()
446
if storage_kind in ('fulltext', 'chunked'):
447
if self._bytes is None:
448
# Grab and cache the raw bytes for this entry
449
# and break the ref-cycle with _manager since we don't need it
452
self._manager._prepare_for_extract()
453
except zlib.error as value:
454
raise errors.DecompressCorruption("zlib: " + str(value))
455
block = self._manager._block
456
self._bytes = block.extract(self.key, self._start, self._end)
457
# There are code paths that first extract as fulltext, and then
458
# extract as storage_kind (smart fetch). So we don't break the
459
# refcycle here, but instead in manager.get_record_stream()
481
460
if storage_kind == 'fulltext':
482
return b''.join(self._chunks)
483
elif storage_kind == 'chunked':
486
return osutils.chunks_to_lines(self._chunks)
487
raise UnavailableRepresentation(self.key, storage_kind,
490
def iter_bytes_as(self, storage_kind):
491
if self._chunks is None:
492
self._extract_bytes()
493
if storage_kind == 'chunked':
494
return iter(self._chunks)
495
elif storage_kind == 'lines':
496
return iter(osutils.chunks_to_lines(self._chunks))
497
raise UnavailableRepresentation(self.key, storage_kind,
464
raise errors.UnavailableRepresentation(self.key, storage_kind,
498
465
self.storage_kind)
501
468
class _LazyGroupContentManager(object):
502
469
"""This manages a group of _LazyGroupCompressFactory objects."""
504
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
505
# current size, and still be considered
507
_full_block_size = 4 * 1024 * 1024
508
_full_mixed_block_size = 2 * 1024 * 1024
509
_full_enough_block_size = 3 * 1024 * 1024 # size at which we won't repack
510
_full_enough_mixed_block_size = 2 * 768 * 1024 # 1.5MB
471
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
472
# current size, and still be considered
474
_full_block_size = 4*1024*1024
475
_full_mixed_block_size = 2*1024*1024
476
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
477
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
512
479
def __init__(self, block, get_compressor_settings=None):
513
480
self._block = block
576
543
old_length = self._block._content_length
578
545
for factory in self._factories:
579
chunks = factory.get_bytes_as('chunked')
580
chunks_len = factory.size
581
if chunks_len is None:
582
chunks_len = sum(map(len, chunks))
546
bytes = factory.get_bytes_as('fulltext')
583
547
(found_sha1, start_point, end_point,
584
type) = compressor.compress(
585
factory.key, chunks, chunks_len, factory.sha1)
548
type) = compressor.compress(factory.key, bytes, factory.sha1)
586
549
# Now update this factory with the new offsets, etc
587
550
factory.sha1 = found_sha1
588
551
factory._start = start_point
737
700
# 1 line for end byte
738
701
header_lines = []
739
702
for factory in self._factories:
740
key_bytes = b'\x00'.join(factory.key)
703
key_bytes = '\x00'.join(factory.key)
741
704
parents = factory.parents
742
705
if parents is None:
743
parent_bytes = b'None:'
706
parent_bytes = 'None:'
745
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
746
record_header = b'%s\n%s\n%d\n%d\n' % (
708
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
709
record_header = '%s\n%s\n%d\n%d\n' % (
747
710
key_bytes, parent_bytes, factory._start, factory._end)
748
711
header_lines.append(record_header)
749
712
# TODO: Can we break the refcycle at this point and set
750
713
# factory._manager = None?
751
header_bytes = b''.join(header_lines)
714
header_bytes = ''.join(header_lines)
753
716
header_bytes_len = len(header_bytes)
754
717
z_header_bytes = zlib.compress(header_bytes)
756
719
z_header_bytes_len = len(z_header_bytes)
757
720
block_bytes_len, block_chunks = self._block.to_chunks()
758
lines.append(b'%d\n%d\n%d\n' % (
759
z_header_bytes_len, header_bytes_len, block_bytes_len))
721
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
760
723
lines.append(z_header_bytes)
761
724
lines.extend(block_chunks)
762
725
del z_header_bytes, block_chunks
763
726
# TODO: This is a point where we will double the memory consumption. To
764
727
# avoid this, we probably have to switch to a 'chunked' api
765
return b''.join(lines)
728
return ''.join(lines)
768
731
def from_bytes(cls, bytes):
801
764
block = GroupCompressBlock.from_bytes(block_bytes)
803
766
result = cls(block)
804
for start in range(0, len(header_lines), 4):
767
for start in xrange(0, len(header_lines), 4):
806
key = tuple(header_lines[start].split(b'\x00'))
807
parents_line = header_lines[start + 1]
808
if parents_line == b'None:':
769
key = tuple(header_lines[start].split('\x00'))
770
parents_line = header_lines[start+1]
771
if parents_line == 'None:':
811
parents = tuple([tuple(segment.split(b'\x00'))
812
for segment in parents_line.split(b'\t')
814
start_offset = int(header_lines[start + 2])
815
end_offset = int(header_lines[start + 3])
774
parents = tuple([tuple(segment.split('\x00'))
775
for segment in parents_line.split('\t')
777
start_offset = int(header_lines[start+2])
778
end_offset = int(header_lines[start+3])
816
779
result.add_factory(key, parents, start_offset, end_offset)
833
796
self.endpoint = 0
834
797
self.input_bytes = 0
835
798
self.labels_deltas = {}
836
self._delta_index = None # Set by the children
799
self._delta_index = None # Set by the children
837
800
self._block = GroupCompressBlock()
838
801
if settings is None:
839
802
self._settings = {}
841
804
self._settings = settings
843
def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
806
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
845
807
"""Compress lines with label key.
847
809
:param key: A key tuple. It is stored in the output
848
810
for identification of the text during decompression. If the last
849
element is b'None' it is replaced with the sha1 of the text -
811
element is 'None' it is replaced with the sha1 of the text -
850
812
e.g. sha1:xxxxxxx.
851
:param chunks: Chunks of bytes to be compressed
852
:param length: Length of chunks
813
:param bytes: The bytes to be compressed
853
814
:param expected_sha: If non-None, the sha the lines are believed to
854
815
have. During compression the sha is calculated; a mismatch will
864
825
:seealso VersionedFiles.add_lines:
866
if length == 0: # empty, like a dir entry, etc
827
if not bytes: # empty, like a dir entry, etc
867
828
if nostore_sha == _null_sha1:
868
raise ExistingContent()
829
raise errors.ExistingContent()
869
830
return _null_sha1, 0, 0, 'fulltext'
870
831
# we assume someone knew what they were doing when they passed it in
871
832
if expected_sha is not None:
872
833
sha1 = expected_sha
874
sha1 = osutils.sha_strings(chunks)
835
sha1 = osutils.sha_string(bytes)
875
836
if nostore_sha is not None:
876
837
if sha1 == nostore_sha:
877
raise ExistingContent()
838
raise errors.ExistingContent()
878
839
if key[-1] is None:
879
key = key[:-1] + (b'sha1:' + sha1,)
840
key = key[:-1] + ('sha1:' + sha1,)
881
start, end, type = self._compress(key, chunks, length, length / 2, soft)
842
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
882
843
return sha1, start, end, type
884
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
845
def _compress(self, key, bytes, max_delta_size, soft=False):
885
846
"""Compress lines with label key.
887
848
:param key: A key tuple. It is stored in the output for identification
888
849
of the text during decompression.
890
:param chunks: The chunks of bytes to be compressed
892
:param input_len: The length of the chunks
851
:param bytes: The bytes to be compressed
894
853
:param max_delta_size: The size above which we issue a fulltext instead
906
865
"""Extract a key previously added to the compressor.
908
867
:param key: The key to extract.
909
:return: An iterable over chunks and the sha1.
868
:return: An iterable over bytes and the sha1.
911
(start_byte, start_chunk, end_byte,
912
end_chunk) = self.labels_deltas[key]
870
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
913
871
delta_chunks = self.chunks[start_chunk:end_chunk]
914
stored_bytes = b''.join(delta_chunks)
915
kind = stored_bytes[:1]
872
stored_bytes = ''.join(delta_chunks)
873
if stored_bytes[0] == 'f':
917
874
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
918
875
data_len = fulltext_len + 1 + offset
919
if data_len != len(stored_bytes):
876
if data_len != len(stored_bytes):
920
877
raise ValueError('Index claimed fulltext len, but stored bytes'
921
878
' claim %s != %s'
922
879
% (len(stored_bytes), data_len))
923
data = [stored_bytes[offset + 1:]]
880
bytes = stored_bytes[offset + 1:]
926
raise ValueError('Unknown content kind, bytes claim %s' % kind)
927
882
# XXX: This is inefficient at best
928
source = b''.join(self.chunks[:start_chunk])
883
source = ''.join(self.chunks[:start_chunk])
884
if stored_bytes[0] != 'd':
885
raise ValueError('Unknown content kind, bytes claim %s'
886
% (stored_bytes[0],))
929
887
delta_len, offset = decode_base128_int(stored_bytes[1:10])
930
888
data_len = delta_len + 1 + offset
931
889
if data_len != len(stored_bytes):
932
890
raise ValueError('Index claimed delta len, but stored bytes'
933
891
' claim %s != %s'
934
892
% (len(stored_bytes), data_len))
935
data = [apply_delta(source, stored_bytes[offset + 1:])]
936
data_sha1 = osutils.sha_strings(data)
937
return data, data_sha1
893
bytes = apply_delta(source, stored_bytes[offset + 1:])
894
bytes_sha1 = osutils.sha_string(bytes)
895
return bytes, bytes_sha1
940
898
"""Finish this group, creating a formatted stream.
974
932
# The actual content is managed by LinesDeltaIndex
975
933
self.chunks = self._delta_index.lines
977
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
935
def _compress(self, key, bytes, max_delta_size, soft=False):
978
936
"""see _CommonGroupCompressor._compress"""
979
new_lines = osutils.chunks_to_lines(chunks)
937
input_len = len(bytes)
938
new_lines = osutils.split_lines(bytes)
980
939
out_lines, index_lines = self._delta_index.make_delta(
981
940
new_lines, bytes_length=input_len, soft=soft)
982
941
delta_length = sum(map(len, out_lines))
983
942
if delta_length > max_delta_size:
984
943
# The delta is longer than the fulltext, insert a fulltext
985
944
type = 'fulltext'
986
out_lines = [b'f', encode_base128_int(input_len)]
945
out_lines = ['f', encode_base128_int(input_len)]
987
946
out_lines.extend(new_lines)
988
947
index_lines = [False, False]
989
948
index_lines.extend([True] * len(new_lines))
991
950
# this is a worthy delta, output it
994
953
# Update the delta_length to include those two encoded integers
995
954
out_lines[1] = encode_base128_int(delta_length)
996
955
# Before insertion
1028
987
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1029
988
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
1031
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
990
def _compress(self, key, bytes, max_delta_size, soft=False):
1032
991
"""see _CommonGroupCompressor._compress"""
992
input_len = len(bytes)
1033
993
# By having action/label/sha1/len, we can parse the group if the index
1034
994
# was ever destroyed, we have the key in 'label', we know the final
1035
995
# bytes are valid from sha1, and we know where to find the end of this
1041
1001
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1042
1002
if self._delta_index._source_offset != self.endpoint:
1043
1003
raise AssertionError('_source_offset != endpoint'
1044
' somehow the DeltaIndex got out of sync with'
1045
' the output lines')
1046
bytes = b''.join(chunks)
1004
' somehow the DeltaIndex got out of sync with'
1005
' the output lines')
1047
1006
delta = self._delta_index.make_delta(bytes, max_delta_size)
1049
1008
type = 'fulltext'
1050
enc_length = encode_base128_int(input_len)
1009
enc_length = encode_base128_int(len(bytes))
1051
1010
len_mini_header = 1 + len(enc_length)
1052
1011
self._delta_index.add_source(bytes, len_mini_header)
1053
new_chunks = [b'f', enc_length] + chunks
1012
new_chunks = ['f', enc_length, bytes]
1056
1015
enc_length = encode_base128_int(len(delta))
1057
1016
len_mini_header = 1 + len(enc_length)
1058
new_chunks = [b'd', enc_length, delta]
1017
new_chunks = ['d', enc_length, delta]
1059
1018
self._delta_index.add_delta_source(delta, len_mini_header)
1060
1019
# Before insertion
1061
1020
start = self.endpoint
1102
1061
graph_index = BTreeBuilder(reference_lists=ref_length,
1103
key_elements=keylength)
1062
key_elements=keylength)
1104
1063
stream = transport.open_write_stream('newpack')
1105
1064
writer = pack.ContainerWriter(stream.write)
1107
index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1108
add_callback=graph_index.add_nodes,
1109
inconsistency_fatal=inconsistency_fatal)
1066
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1067
add_callback=graph_index.add_nodes,
1068
inconsistency_fatal=inconsistency_fatal)
1110
1069
access = pack_repo._DirectPackAccess({})
1111
1070
access.set_writer(writer, graph_index, (transport, 'newpack'))
1112
1071
result = GroupCompressVersionedFiles(index, access, delta)
1271
1230
def without_fallbacks(self):
1272
1231
"""Return a clone of this object without any fallbacks configured."""
1273
1232
return GroupCompressVersionedFiles(self._index, self._access,
1274
self._delta, _unadded_refs=dict(
1275
self._unadded_refs),
1276
_group_cache=self._group_cache)
1233
self._delta, _unadded_refs=dict(self._unadded_refs),
1234
_group_cache=self._group_cache)
1278
1236
def add_lines(self, key, parents, lines, parent_texts=None,
1279
left_matching_blocks=None, nostore_sha=None, random_id=False,
1280
check_content=True):
1237
left_matching_blocks=None, nostore_sha=None, random_id=False,
1238
check_content=True):
1281
1239
"""Add a text to the store.
1283
1241
:param key: The key tuple of the text to add.
1312
1270
back to future add_lines calls in the parent_texts dictionary.
1314
1272
self._index._check_write_ok()
1316
self._check_lines_not_unicode(lines)
1317
self._check_lines_are_lines(lines)
1318
return self.add_content(
1319
ChunkedContentFactory(
1320
key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1321
parent_texts, left_matching_blocks, nostore_sha, random_id)
1323
def add_content(self, factory, parent_texts=None,
1324
left_matching_blocks=None, nostore_sha=None,
1326
"""Add a text to the store.
1328
:param factory: A ContentFactory that can be used to retrieve the key,
1329
parents and contents.
1330
:param parent_texts: An optional dictionary containing the opaque
1331
representations of some or all of the parents of version_id to
1332
allow delta optimisations. VERY IMPORTANT: the texts must be those
1333
returned by add_lines or data corruption can be caused.
1334
:param left_matching_blocks: a hint about which areas are common
1335
between the text and its left-hand-parent. The format is
1336
the SequenceMatcher.get_matching_blocks format.
1337
:param nostore_sha: Raise ExistingContent and do not add the lines to
1338
the versioned file if the digest of the lines matches this.
1339
:param random_id: If True a random id has been selected rather than
1340
an id determined by some deterministic process such as a converter
1341
from a foreign VCS. When True the backend may choose not to check
1342
for uniqueness of the resulting key within the versioned file, so
1343
this should only be done when the result is expected to be unique
1345
:return: The text sha1, the number of bytes in the text, and an opaque
1346
representation of the inserted version which can be provided
1347
back to future add_lines calls in the parent_texts dictionary.
1273
self._check_add(key, lines, random_id, check_content)
1275
# The caller might pass None if there is no graph data, but kndx
1276
# indexes can't directly store that, so we give them
1277
# an empty tuple instead.
1279
# double handling for now. Make it work until then.
1280
length = sum(map(len, lines))
1281
record = ChunkedContentFactory(key, parents, None, lines)
1282
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1283
nostore_sha=nostore_sha))[0]
1284
return sha1, length, None
1286
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1287
"""See VersionedFiles._add_text()."""
1349
1288
self._index._check_write_ok()
1350
parents = factory.parents
1351
self._check_add(factory.key, random_id)
1289
self._check_add(key, None, random_id, check_content=False)
1290
if text.__class__ is not str:
1291
raise errors.BzrBadParameterUnicode("text")
1352
1292
if parents is None:
1353
1293
# The caller might pass None if there is no graph data, but kndx
1354
1294
# indexes can't directly store that, so we give them
1355
1295
# an empty tuple instead.
1357
1297
# double handling for now. Make it work until then.
1358
sha1, length = list(self._insert_record_stream(
1359
[factory], random_id=random_id, nostore_sha=nostore_sha))[0]
1299
record = FulltextContentFactory(key, parents, None, text)
1300
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1301
nostore_sha=nostore_sha))[0]
1360
1302
return sha1, length, None
1362
1304
def add_fallback_versioned_files(self, a_versioned_files):
1634
1578
key_to_source_map)
1635
1579
elif ordering == 'as-requested':
1636
1580
source_keys = self._get_as_requested_source_keys(orig_keys,
1637
locations, unadded_keys, key_to_source_map)
1581
locations, unadded_keys, key_to_source_map)
1639
1583
# We want to yield the keys in a semi-optimal (read-wise) ordering.
1640
1584
# Otherwise we thrash the _group_cache and destroy performance
1641
1585
source_keys = self._get_io_ordered_source_keys(locations,
1642
unadded_keys, source_result)
1586
unadded_keys, source_result)
1643
1587
for key in missing:
1644
1588
yield AbsentContentFactory(key)
1645
1589
# Batch up as many keys as we can until either:
1755
1698
self._compressor = self._make_group_compressor()
1756
1699
self._unadded_refs = {}
1757
1700
keys_to_add = []
1760
1702
bytes_len, chunks = self._compressor.flush().to_chunks()
1761
1703
self._compressor = self._make_group_compressor()
1762
1704
# Note: At this point we still have 1 copy of the fulltext (in
1763
1705
# record and the var 'bytes'), and this generates 2 copies of
1764
1706
# the compressed text (one for bytes, one in chunks)
1707
# TODO: Push 'chunks' down into the _access api, so that we don't
1708
# have to double compressed memory here
1765
1709
# TODO: Figure out how to indicate that we would be happy to free
1766
1710
# the fulltext content at this point. Note that sometimes we
1767
1711
# will want it later (streaming CHK pages), but most of the
1768
1712
# time we won't (everything else)
1769
index, start, length = self._access.add_raw_record(
1770
None, bytes_len, chunks)
1713
bytes = ''.join(chunks)
1715
index, start, length = self._access.add_raw_records(
1716
[(None, len(bytes))], bytes)[0]
1772
1718
for key, reads, refs in keys_to_add:
1773
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1719
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1774
1720
self._index.add_records(nodes, random_id=random_id)
1775
1721
self._unadded_refs = {}
1776
1722
del keys_to_add[:]
1822
1769
raise AssertionError('No insert_manager set')
1823
1770
if insert_manager is not record._manager:
1824
1771
raise AssertionError('insert_manager does not match'
1825
' the current record, we cannot be positive'
1826
' that the appropriate content was inserted.'
1828
value = b"%d %d %d %d" % (block_start, block_length,
1829
record._start, record._end)
1772
' the current record, we cannot be positive'
1773
' that the appropriate content was inserted.'
1775
value = "%d %d %d %d" % (block_start, block_length,
1776
record._start, record._end)
1830
1777
nodes = [(record.key, value, (record.parents,))]
1831
1778
# TODO: Consider buffering up many nodes to be added, not
1832
1779
# sure how much overhead this has, but we're seeing
1834
1781
self._index.add_records(nodes, random_id=random_id)
1837
chunks = record.get_bytes_as('chunked')
1838
except UnavailableRepresentation:
1839
adapter_key = record.storage_kind, 'chunked'
1784
bytes = record.get_bytes_as('fulltext')
1785
except errors.UnavailableRepresentation:
1786
adapter_key = record.storage_kind, 'fulltext'
1840
1787
adapter = get_adapter(adapter_key)
1841
chunks = adapter.get_bytes(record, 'chunked')
1842
chunks_len = record.size
1843
if chunks_len is None:
1844
chunks_len = sum(map(len, chunks))
1788
bytes = adapter.get_bytes(record)
1845
1789
if len(record.key) > 1:
1846
1790
prefix = record.key[0]
1847
1791
soft = (prefix == last_prefix)
1851
if max_fulltext_len < chunks_len:
1852
max_fulltext_len = chunks_len
1795
if max_fulltext_len < len(bytes):
1796
max_fulltext_len = len(bytes)
1853
1797
max_fulltext_prefix = prefix
1854
1798
(found_sha1, start_point, end_point,
1855
type) = self._compressor.compress(
1856
record.key, chunks, chunks_len, record.sha1, soft=soft,
1857
nostore_sha=nostore_sha)
1858
# delta_ratio = float(chunks_len) / (end_point - start_point)
1799
type) = self._compressor.compress(record.key,
1800
bytes, record.sha1, soft=soft,
1801
nostore_sha=nostore_sha)
1802
# delta_ratio = float(len(bytes)) / (end_point - start_point)
1859
1803
# Check if we want to continue to include that text
1860
1804
if (prefix == max_fulltext_prefix
1861
and end_point < 2 * max_fulltext_len):
1805
and end_point < 2 * max_fulltext_len):
1862
1806
# As long as we are on the same file_id, we will fill at least
1863
1807
# 2 * max_fulltext_len
1864
1808
start_new_block = False
1865
elif end_point > 4 * 1024 * 1024:
1809
elif end_point > 4*1024*1024:
1866
1810
start_new_block = True
1867
1811
elif (prefix is not None and prefix != last_prefix
1868
and end_point > 2 * 1024 * 1024):
1812
and end_point > 2*1024*1024):
1869
1813
start_new_block = True
1871
1815
start_new_block = False
1873
1817
if start_new_block:
1874
1818
self._compressor.pop_last()
1876
max_fulltext_len = chunks_len
1820
max_fulltext_len = len(bytes)
1877
1821
(found_sha1, start_point, end_point,
1878
type) = self._compressor.compress(
1879
record.key, chunks, chunks_len, record.sha1)
1822
type) = self._compressor.compress(record.key, bytes,
1880
1824
if record.key[-1] is None:
1881
key = record.key[:-1] + (b'sha1:' + found_sha1,)
1825
key = record.key[:-1] + ('sha1:' + found_sha1,)
1883
1827
key = record.key
1884
1828
self._unadded_refs[key] = record.parents
1885
yield found_sha1, chunks_len
1886
1830
as_st = static_tuple.StaticTuple.from_sequence
1887
1831
if record.parents is not None:
1888
1832
parents = as_st([as_st(p) for p in record.parents])
1891
1835
refs = static_tuple.StaticTuple(parents)
1893
(key, b'%d %d' % (start_point, end_point), refs))
1836
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1894
1837
if len(keys_to_add):
1896
1839
self._compressor = None
1922
1865
# but we need to setup a list of records to visit.
1923
1866
# we need key, position, length
1924
1867
for key_idx, record in enumerate(self.get_record_stream(keys,
1925
'unordered', True)):
1868
'unordered', True)):
1926
1869
# XXX: todo - optimise to use less than full texts.
1927
1870
key = record.key
1928
1871
if pb is not None:
1929
1872
pb.update('Walking content', key_idx, total)
1930
1873
if record.storage_kind == 'absent':
1931
1874
raise errors.RevisionNotPresent(key, self)
1932
for line in record.iter_bytes_as('lines'):
1875
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1933
1877
yield line, key
1934
1878
if pb is not None:
1935
1879
pb.update('Walking content', total, total)
1997
1941
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1999
1943
def __init__(self, graph_index, is_locked, parents=True,
2000
add_callback=None, track_external_parent_refs=False,
2001
inconsistency_fatal=True, track_new_keys=False):
1944
add_callback=None, track_external_parent_refs=False,
1945
inconsistency_fatal=True, track_new_keys=False):
2002
1946
"""Construct a _GCGraphIndex on a graph_index.
2004
:param graph_index: An implementation of breezy.index.GraphIndex.
1948
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2005
1949
:param is_locked: A callback, returns True if the index is locked and
2007
1951
:param parents: If True, record knits parents, if not do not record