17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
22
from ..lazy_import import lazy_import
24
from brzlib.lazy_import import lazy_import
23
25
lazy_import(globals(), """
33
from breezy.bzr import (
39
from breezy.i18n import gettext
39
from brzlib.repofmt import pack_repo
40
from brzlib.i18n import gettext
45
from .btree_index import BTreeBuilder
46
from ..lru_cache import LRUSizeCache
47
from .versionedfile import (
43
from brzlib.btree_index import BTreeBuilder
44
from brzlib.lru_cache import LRUSizeCache
45
from brzlib.versionedfile import (
50
48
AbsentContentFactory,
88
85
return present_keys
91
class DecompressCorruption(errors.BzrError):
93
_fmt = "Corruption while decompressing repository file%(orig_error)s"
95
def __init__(self, orig_error=None):
96
if orig_error is not None:
97
self.orig_error = ", %s" % (orig_error,)
100
errors.BzrError.__init__(self)
103
88
# The max zlib window size is 32kB, so if we set 'max_size' output of the
104
89
# decompressor to the requested bytes + 32kB, then we should guarantee
105
90
# num_bytes coming out.
106
_ZLIB_DECOMP_WINDOW = 32 * 1024
91
_ZLIB_DECOMP_WINDOW = 32*1024
109
93
class GroupCompressBlock(object):
110
94
"""An object which maintains the internal structure of the compressed data.
222
206
# At present, we have 2 integers for the compressed and uncompressed
223
207
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
224
208
# checking too far, cap the search to 14 bytes.
225
pos2 = data.index(b'\n', pos, pos + 14)
226
self._z_content_length = int(data[pos:pos2])
228
pos2 = data.index(b'\n', pos, pos + 14)
229
self._content_length = int(data[pos:pos2])
231
if len(data) != (pos + self._z_content_length):
209
pos2 = bytes.index('\n', pos, pos + 14)
210
self._z_content_length = int(bytes[pos:pos2])
212
pos2 = bytes.index('\n', pos, pos + 14)
213
self._content_length = int(bytes[pos:pos2])
215
if len(bytes) != (pos + self._z_content_length):
232
216
# XXX: Define some GCCorrupt error ?
233
217
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
234
(len(data), pos, self._z_content_length))
235
self._z_content_chunks = (data[pos:],)
218
(len(bytes), pos, self._z_content_length))
219
self._z_content_chunks = (bytes[pos:],)
238
222
def _z_content(self):
241
225
Meant only to be used by the test suite.
243
227
if self._z_content_chunks is not None:
244
return b''.join(self._z_content_chunks)
228
return ''.join(self._z_content_chunks)
248
232
def from_bytes(cls, bytes):
251
if header not in cls.GCB_KNOWN_HEADERS:
234
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
252
235
raise ValueError('bytes did not start with any of %r'
253
236
% (cls.GCB_KNOWN_HEADERS,))
254
if header == cls.GCB_HEADER:
237
# XXX: why not testing the whole header ?
255
239
out._compressor_name = 'zlib'
256
elif header == cls.GCB_LZ_HEADER:
240
elif bytes[4] == 'l':
257
241
out._compressor_name = 'lzma'
259
raise ValueError('unknown compressor: %r' % (header,))
243
raise ValueError('unknown compressor: %r' % (bytes,))
260
244
out._parse_bytes(bytes, 6)
268
252
:return: The bytes for the content
270
254
if start == end == 0:
272
256
self._ensure_content(end)
273
257
# The bytes are 'f' or 'd' for the type, then a variable-length
274
258
# base128 integer for the content size, then the actual content
275
259
# We know that the variable-length integer won't be longer than 5
276
260
# bytes (it takes 5 bytes to encode 2^32)
277
c = self._content[start:start + 1]
261
c = self._content[start]
279
263
type = 'fulltext'
282
266
raise ValueError('Unknown content control code: %s'
285
269
content_len, len_len = decode_base128_int(
286
self._content[start + 1:start + 6])
270
self._content[start + 1:start + 6])
287
271
content_start = start + 1 + len_len
288
272
if end != content_start + content_len:
289
273
raise ValueError('end != len according to field header'
290
' %s != %s' % (end, content_start + content_len))
292
return [self._content[content_start:end]]
293
# Must be type delta as checked above
294
return [apply_delta_to_source(self._content, content_start, end)]
274
' %s != %s' % (end, content_start + content_len))
276
bytes = self._content[content_start:end]
278
bytes = apply_delta_to_source(self._content, content_start, end)
296
281
def set_chunked_content(self, content_chunks, length):
297
282
"""Set the content of this block to the given chunks."""
362
347
while pos < self._content_length:
363
kind = self._content[pos:pos + 1]
348
kind = self._content[pos]
365
if kind not in (b'f', b'd'):
350
if kind not in ('f', 'd'):
366
351
raise ValueError('invalid kind character: %r' % (kind,))
367
352
content_len, len_len = decode_base128_int(
368
self._content[pos:pos + 5])
353
self._content[pos:pos + 5])
370
355
if content_len + pos > self._content_length:
371
356
raise ValueError('invalid content_len %d for record @ pos %d'
372
357
% (content_len, pos - len_len - 1))
373
if kind == b'f': # Fulltext
358
if kind == 'f': # Fulltext
375
text = self._content[pos:pos + content_len]
376
result.append((b'f', content_len, text))
360
text = self._content[pos:pos+content_len]
361
result.append(('f', content_len, text))
378
result.append((b'f', content_len))
379
elif kind == b'd': # Delta
380
delta_content = self._content[pos:pos + content_len]
363
result.append(('f', content_len))
364
elif kind == 'd': # Delta
365
delta_content = self._content[pos:pos+content_len]
382
367
# The first entry in a delta is the decompressed length
383
368
decomp_len, delta_pos = decode_base128_int(delta_content)
384
result.append((b'd', content_len, decomp_len, delta_info))
369
result.append(('d', content_len, decomp_len, delta_info))
386
371
while delta_pos < content_len:
387
c = delta_content[delta_pos]
372
c = ord(delta_content[delta_pos])
391
376
delta_pos) = decode_copy_instruction(delta_content, c,
394
text = self._content[offset:offset + length]
395
delta_info.append((b'c', offset, length, text))
379
text = self._content[offset:offset+length]
380
delta_info.append(('c', offset, length, text))
397
delta_info.append((b'c', offset, length))
382
delta_info.append(('c', offset, length))
398
383
measured_len += length
401
txt = delta_content[delta_pos:delta_pos + c]
386
txt = delta_content[delta_pos:delta_pos+c]
404
delta_info.append((b'i', c, txt))
389
delta_info.append(('i', c, txt))
405
390
measured_len += c
407
392
if delta_pos != content_len:
451
435
def __repr__(self):
452
436
return '%s(%s, first=%s)' % (self.__class__.__name__,
453
self.key, self._first)
455
def _extract_bytes(self):
456
# Grab and cache the raw bytes for this entry
457
# and break the ref-cycle with _manager since we don't need it
460
self._manager._prepare_for_extract()
461
except zlib.error as value:
462
raise DecompressCorruption("zlib: " + str(value))
463
block = self._manager._block
464
self._chunks = block.extract(self.key, self._start, self._end)
465
# There are code paths that first extract as fulltext, and then
466
# extract as storage_kind (smart fetch). So we don't break the
467
# refcycle here, but instead in manager.get_record_stream()
437
self.key, self._first)
469
439
def get_bytes_as(self, storage_kind):
470
440
if storage_kind == self.storage_kind:
472
442
# wire bytes, something...
473
443
return self._manager._wire_bytes()
476
if storage_kind in ('fulltext', 'chunked', 'lines'):
477
if self._chunks is None:
478
self._extract_bytes()
446
if storage_kind in ('fulltext', 'chunked'):
447
if self._bytes is None:
448
# Grab and cache the raw bytes for this entry
449
# and break the ref-cycle with _manager since we don't need it
452
self._manager._prepare_for_extract()
453
except zlib.error as value:
454
raise errors.DecompressCorruption("zlib: " + str(value))
455
block = self._manager._block
456
self._bytes = block.extract(self.key, self._start, self._end)
457
# There are code paths that first extract as fulltext, and then
458
# extract as storage_kind (smart fetch). So we don't break the
459
# refcycle here, but instead in manager.get_record_stream()
479
460
if storage_kind == 'fulltext':
480
return b''.join(self._chunks)
481
elif storage_kind == 'chunked':
484
return osutils.chunks_to_lines(self._chunks)
485
raise errors.UnavailableRepresentation(self.key, storage_kind,
488
def iter_bytes_as(self, storage_kind):
489
if self._chunks is None:
490
self._extract_bytes()
491
if storage_kind == 'chunked':
492
return iter(self._chunks)
493
elif storage_kind == 'lines':
494
return iter(osutils.chunks_to_lines(self._chunks))
495
464
raise errors.UnavailableRepresentation(self.key, storage_kind,
496
465
self.storage_kind)
499
468
class _LazyGroupContentManager(object):
500
469
"""This manages a group of _LazyGroupCompressFactory objects."""
502
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
503
# current size, and still be considered
505
_full_block_size = 4 * 1024 * 1024
506
_full_mixed_block_size = 2 * 1024 * 1024
507
_full_enough_block_size = 3 * 1024 * 1024 # size at which we won't repack
508
_full_enough_mixed_block_size = 2 * 768 * 1024 # 1.5MB
471
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
472
# current size, and still be considered
474
_full_block_size = 4*1024*1024
475
_full_mixed_block_size = 2*1024*1024
476
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
477
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
510
479
def __init__(self, block, get_compressor_settings=None):
511
480
self._block = block
574
543
old_length = self._block._content_length
576
545
for factory in self._factories:
577
chunks = factory.get_bytes_as('chunked')
578
chunks_len = factory.size
579
if chunks_len is None:
580
chunks_len = sum(map(len, chunks))
546
bytes = factory.get_bytes_as('fulltext')
581
547
(found_sha1, start_point, end_point,
582
type) = compressor.compress(
583
factory.key, chunks, chunks_len, factory.sha1)
548
type) = compressor.compress(factory.key, bytes, factory.sha1)
584
549
# Now update this factory with the new offsets, etc
585
550
factory.sha1 = found_sha1
586
551
factory._start = start_point
735
700
# 1 line for end byte
736
701
header_lines = []
737
702
for factory in self._factories:
738
key_bytes = b'\x00'.join(factory.key)
703
key_bytes = '\x00'.join(factory.key)
739
704
parents = factory.parents
740
705
if parents is None:
741
parent_bytes = b'None:'
706
parent_bytes = 'None:'
743
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
744
record_header = b'%s\n%s\n%d\n%d\n' % (
708
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
709
record_header = '%s\n%s\n%d\n%d\n' % (
745
710
key_bytes, parent_bytes, factory._start, factory._end)
746
711
header_lines.append(record_header)
747
712
# TODO: Can we break the refcycle at this point and set
748
713
# factory._manager = None?
749
header_bytes = b''.join(header_lines)
714
header_bytes = ''.join(header_lines)
751
716
header_bytes_len = len(header_bytes)
752
717
z_header_bytes = zlib.compress(header_bytes)
754
719
z_header_bytes_len = len(z_header_bytes)
755
720
block_bytes_len, block_chunks = self._block.to_chunks()
756
lines.append(b'%d\n%d\n%d\n' % (
757
z_header_bytes_len, header_bytes_len, block_bytes_len))
721
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
758
723
lines.append(z_header_bytes)
759
724
lines.extend(block_chunks)
760
725
del z_header_bytes, block_chunks
761
726
# TODO: This is a point where we will double the memory consumption. To
762
727
# avoid this, we probably have to switch to a 'chunked' api
763
return b''.join(lines)
728
return ''.join(lines)
766
731
def from_bytes(cls, bytes):
799
764
block = GroupCompressBlock.from_bytes(block_bytes)
801
766
result = cls(block)
802
for start in range(0, len(header_lines), 4):
767
for start in xrange(0, len(header_lines), 4):
804
key = tuple(header_lines[start].split(b'\x00'))
805
parents_line = header_lines[start + 1]
806
if parents_line == b'None:':
769
key = tuple(header_lines[start].split('\x00'))
770
parents_line = header_lines[start+1]
771
if parents_line == 'None:':
809
parents = tuple([tuple(segment.split(b'\x00'))
810
for segment in parents_line.split(b'\t')
812
start_offset = int(header_lines[start + 2])
813
end_offset = int(header_lines[start + 3])
774
parents = tuple([tuple(segment.split('\x00'))
775
for segment in parents_line.split('\t')
777
start_offset = int(header_lines[start+2])
778
end_offset = int(header_lines[start+3])
814
779
result.add_factory(key, parents, start_offset, end_offset)
831
796
self.endpoint = 0
832
797
self.input_bytes = 0
833
798
self.labels_deltas = {}
834
self._delta_index = None # Set by the children
799
self._delta_index = None # Set by the children
835
800
self._block = GroupCompressBlock()
836
801
if settings is None:
837
802
self._settings = {}
839
804
self._settings = settings
841
def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
806
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
843
807
"""Compress lines with label key.
845
809
:param key: A key tuple. It is stored in the output
846
810
for identification of the text during decompression. If the last
847
element is b'None' it is replaced with the sha1 of the text -
811
element is 'None' it is replaced with the sha1 of the text -
848
812
e.g. sha1:xxxxxxx.
849
:param chunks: Chunks of bytes to be compressed
850
:param length: Length of chunks
813
:param bytes: The bytes to be compressed
851
814
:param expected_sha: If non-None, the sha the lines are believed to
852
815
have. During compression the sha is calculated; a mismatch will
869
832
if expected_sha is not None:
870
833
sha1 = expected_sha
872
sha1 = osutils.sha_strings(chunks)
835
sha1 = osutils.sha_string(bytes)
873
836
if nostore_sha is not None:
874
837
if sha1 == nostore_sha:
875
838
raise errors.ExistingContent()
876
839
if key[-1] is None:
877
key = key[:-1] + (b'sha1:' + sha1,)
840
key = key[:-1] + ('sha1:' + sha1,)
879
start, end, type = self._compress(key, chunks, length, length / 2, soft)
842
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
880
843
return sha1, start, end, type
882
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
845
def _compress(self, key, bytes, max_delta_size, soft=False):
883
846
"""Compress lines with label key.
885
848
:param key: A key tuple. It is stored in the output for identification
886
849
of the text during decompression.
888
:param chunks: The chunks of bytes to be compressed
890
:param input_len: The length of the chunks
851
:param bytes: The bytes to be compressed
892
853
:param max_delta_size: The size above which we issue a fulltext instead
904
865
"""Extract a key previously added to the compressor.
906
867
:param key: The key to extract.
907
:return: An iterable over chunks and the sha1.
868
:return: An iterable over bytes and the sha1.
909
(start_byte, start_chunk, end_byte,
910
end_chunk) = self.labels_deltas[key]
870
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
911
871
delta_chunks = self.chunks[start_chunk:end_chunk]
912
stored_bytes = b''.join(delta_chunks)
913
kind = stored_bytes[:1]
872
stored_bytes = ''.join(delta_chunks)
873
if stored_bytes[0] == 'f':
915
874
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
916
875
data_len = fulltext_len + 1 + offset
917
if data_len != len(stored_bytes):
876
if data_len != len(stored_bytes):
918
877
raise ValueError('Index claimed fulltext len, but stored bytes'
919
878
' claim %s != %s'
920
879
% (len(stored_bytes), data_len))
921
data = [stored_bytes[offset + 1:]]
880
bytes = stored_bytes[offset + 1:]
924
raise ValueError('Unknown content kind, bytes claim %s' % kind)
925
882
# XXX: This is inefficient at best
926
source = b''.join(self.chunks[:start_chunk])
883
source = ''.join(self.chunks[:start_chunk])
884
if stored_bytes[0] != 'd':
885
raise ValueError('Unknown content kind, bytes claim %s'
886
% (stored_bytes[0],))
927
887
delta_len, offset = decode_base128_int(stored_bytes[1:10])
928
888
data_len = delta_len + 1 + offset
929
889
if data_len != len(stored_bytes):
930
890
raise ValueError('Index claimed delta len, but stored bytes'
931
891
' claim %s != %s'
932
892
% (len(stored_bytes), data_len))
933
data = [apply_delta(source, stored_bytes[offset + 1:])]
934
data_sha1 = osutils.sha_strings(data)
935
return data, data_sha1
893
bytes = apply_delta(source, stored_bytes[offset + 1:])
894
bytes_sha1 = osutils.sha_string(bytes)
895
return bytes, bytes_sha1
938
898
"""Finish this group, creating a formatted stream.
972
932
# The actual content is managed by LinesDeltaIndex
973
933
self.chunks = self._delta_index.lines
975
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
935
def _compress(self, key, bytes, max_delta_size, soft=False):
976
936
"""see _CommonGroupCompressor._compress"""
977
new_lines = osutils.chunks_to_lines(chunks)
937
input_len = len(bytes)
938
new_lines = osutils.split_lines(bytes)
978
939
out_lines, index_lines = self._delta_index.make_delta(
979
940
new_lines, bytes_length=input_len, soft=soft)
980
941
delta_length = sum(map(len, out_lines))
981
942
if delta_length > max_delta_size:
982
943
# The delta is longer than the fulltext, insert a fulltext
983
944
type = 'fulltext'
984
out_lines = [b'f', encode_base128_int(input_len)]
945
out_lines = ['f', encode_base128_int(input_len)]
985
946
out_lines.extend(new_lines)
986
947
index_lines = [False, False]
987
948
index_lines.extend([True] * len(new_lines))
989
950
# this is a worthy delta, output it
992
953
# Update the delta_length to include those two encoded integers
993
954
out_lines[1] = encode_base128_int(delta_length)
994
955
# Before insertion
1026
987
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1027
988
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
1029
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
990
def _compress(self, key, bytes, max_delta_size, soft=False):
1030
991
"""see _CommonGroupCompressor._compress"""
992
input_len = len(bytes)
1031
993
# By having action/label/sha1/len, we can parse the group if the index
1032
994
# was ever destroyed, we have the key in 'label', we know the final
1033
995
# bytes are valid from sha1, and we know where to find the end of this
1039
1001
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1040
1002
if self._delta_index._source_offset != self.endpoint:
1041
1003
raise AssertionError('_source_offset != endpoint'
1042
' somehow the DeltaIndex got out of sync with'
1043
' the output lines')
1044
bytes = b''.join(chunks)
1004
' somehow the DeltaIndex got out of sync with'
1005
' the output lines')
1045
1006
delta = self._delta_index.make_delta(bytes, max_delta_size)
1047
1008
type = 'fulltext'
1048
enc_length = encode_base128_int(input_len)
1009
enc_length = encode_base128_int(len(bytes))
1049
1010
len_mini_header = 1 + len(enc_length)
1050
1011
self._delta_index.add_source(bytes, len_mini_header)
1051
new_chunks = [b'f', enc_length] + chunks
1012
new_chunks = ['f', enc_length, bytes]
1054
1015
enc_length = encode_base128_int(len(delta))
1055
1016
len_mini_header = 1 + len(enc_length)
1056
new_chunks = [b'd', enc_length, delta]
1017
new_chunks = ['d', enc_length, delta]
1057
1018
self._delta_index.add_delta_source(delta, len_mini_header)
1058
1019
# Before insertion
1059
1020
start = self.endpoint
1100
1061
graph_index = BTreeBuilder(reference_lists=ref_length,
1101
key_elements=keylength)
1062
key_elements=keylength)
1102
1063
stream = transport.open_write_stream('newpack')
1103
1064
writer = pack.ContainerWriter(stream.write)
1105
index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1106
add_callback=graph_index.add_nodes,
1107
inconsistency_fatal=inconsistency_fatal)
1066
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1067
add_callback=graph_index.add_nodes,
1068
inconsistency_fatal=inconsistency_fatal)
1108
1069
access = pack_repo._DirectPackAccess({})
1109
1070
access.set_writer(writer, graph_index, (transport, 'newpack'))
1110
1071
result = GroupCompressVersionedFiles(index, access, delta)
1269
1230
def without_fallbacks(self):
1270
1231
"""Return a clone of this object without any fallbacks configured."""
1271
1232
return GroupCompressVersionedFiles(self._index, self._access,
1272
self._delta, _unadded_refs=dict(
1273
self._unadded_refs),
1274
_group_cache=self._group_cache)
1233
self._delta, _unadded_refs=dict(self._unadded_refs),
1234
_group_cache=self._group_cache)
1276
1236
def add_lines(self, key, parents, lines, parent_texts=None,
1277
left_matching_blocks=None, nostore_sha=None, random_id=False,
1278
check_content=True):
1237
left_matching_blocks=None, nostore_sha=None, random_id=False,
1238
check_content=True):
1279
1239
"""Add a text to the store.
1281
1241
:param key: The key tuple of the text to add.
1310
1270
back to future add_lines calls in the parent_texts dictionary.
1312
1272
self._index._check_write_ok()
1314
self._check_lines_not_unicode(lines)
1315
self._check_lines_are_lines(lines)
1316
return self.add_content(
1317
ChunkedContentFactory(
1318
key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1319
parent_texts, left_matching_blocks, nostore_sha, random_id)
1321
def add_content(self, factory, parent_texts=None,
1322
left_matching_blocks=None, nostore_sha=None,
1324
"""Add a text to the store.
1326
:param factory: A ContentFactory that can be used to retrieve the key,
1327
parents and contents.
1328
:param parent_texts: An optional dictionary containing the opaque
1329
representations of some or all of the parents of version_id to
1330
allow delta optimisations. VERY IMPORTANT: the texts must be those
1331
returned by add_lines or data corruption can be caused.
1332
:param left_matching_blocks: a hint about which areas are common
1333
between the text and its left-hand-parent. The format is
1334
the SequenceMatcher.get_matching_blocks format.
1335
:param nostore_sha: Raise ExistingContent and do not add the lines to
1336
the versioned file if the digest of the lines matches this.
1337
:param random_id: If True a random id has been selected rather than
1338
an id determined by some deterministic process such as a converter
1339
from a foreign VCS. When True the backend may choose not to check
1340
for uniqueness of the resulting key within the versioned file, so
1341
this should only be done when the result is expected to be unique
1343
:return: The text sha1, the number of bytes in the text, and an opaque
1344
representation of the inserted version which can be provided
1345
back to future add_lines calls in the parent_texts dictionary.
1273
self._check_add(key, lines, random_id, check_content)
1275
# The caller might pass None if there is no graph data, but kndx
1276
# indexes can't directly store that, so we give them
1277
# an empty tuple instead.
1279
# double handling for now. Make it work until then.
1280
length = sum(map(len, lines))
1281
record = ChunkedContentFactory(key, parents, None, lines)
1282
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1283
nostore_sha=nostore_sha))[0]
1284
return sha1, length, None
1286
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1287
"""See VersionedFiles._add_text()."""
1347
1288
self._index._check_write_ok()
1348
parents = factory.parents
1349
self._check_add(factory.key, random_id)
1289
self._check_add(key, None, random_id, check_content=False)
1290
if text.__class__ is not str:
1291
raise errors.BzrBadParameterUnicode("text")
1350
1292
if parents is None:
1351
1293
# The caller might pass None if there is no graph data, but kndx
1352
1294
# indexes can't directly store that, so we give them
1353
1295
# an empty tuple instead.
1355
1297
# double handling for now. Make it work until then.
1356
sha1, length = list(self._insert_record_stream(
1357
[factory], random_id=random_id, nostore_sha=nostore_sha))[0]
1299
record = FulltextContentFactory(key, parents, None, text)
1300
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1301
nostore_sha=nostore_sha))[0]
1358
1302
return sha1, length, None
1360
1304
def add_fallback_versioned_files(self, a_versioned_files):
1632
1578
key_to_source_map)
1633
1579
elif ordering == 'as-requested':
1634
1580
source_keys = self._get_as_requested_source_keys(orig_keys,
1635
locations, unadded_keys, key_to_source_map)
1581
locations, unadded_keys, key_to_source_map)
1637
1583
# We want to yield the keys in a semi-optimal (read-wise) ordering.
1638
1584
# Otherwise we thrash the _group_cache and destroy performance
1639
1585
source_keys = self._get_io_ordered_source_keys(locations,
1640
unadded_keys, source_result)
1586
unadded_keys, source_result)
1641
1587
for key in missing:
1642
1588
yield AbsentContentFactory(key)
1643
1589
# Batch up as many keys as we can until either:
1753
1698
self._compressor = self._make_group_compressor()
1754
1699
self._unadded_refs = {}
1755
1700
keys_to_add = []
1758
1702
bytes_len, chunks = self._compressor.flush().to_chunks()
1759
1703
self._compressor = self._make_group_compressor()
1760
1704
# Note: At this point we still have 1 copy of the fulltext (in
1761
1705
# record and the var 'bytes'), and this generates 2 copies of
1762
1706
# the compressed text (one for bytes, one in chunks)
1707
# TODO: Push 'chunks' down into the _access api, so that we don't
1708
# have to double compressed memory here
1763
1709
# TODO: Figure out how to indicate that we would be happy to free
1764
1710
# the fulltext content at this point. Note that sometimes we
1765
1711
# will want it later (streaming CHK pages), but most of the
1766
1712
# time we won't (everything else)
1767
index, start, length = self._access.add_raw_record(
1768
None, bytes_len, chunks)
1713
bytes = ''.join(chunks)
1715
index, start, length = self._access.add_raw_records(
1716
[(None, len(bytes))], bytes)[0]
1770
1718
for key, reads, refs in keys_to_add:
1771
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1719
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1772
1720
self._index.add_records(nodes, random_id=random_id)
1773
1721
self._unadded_refs = {}
1774
1722
del keys_to_add[:]
1820
1769
raise AssertionError('No insert_manager set')
1821
1770
if insert_manager is not record._manager:
1822
1771
raise AssertionError('insert_manager does not match'
1823
' the current record, we cannot be positive'
1824
' that the appropriate content was inserted.'
1826
value = b"%d %d %d %d" % (block_start, block_length,
1827
record._start, record._end)
1772
' the current record, we cannot be positive'
1773
' that the appropriate content was inserted.'
1775
value = "%d %d %d %d" % (block_start, block_length,
1776
record._start, record._end)
1828
1777
nodes = [(record.key, value, (record.parents,))]
1829
1778
# TODO: Consider buffering up many nodes to be added, not
1830
1779
# sure how much overhead this has, but we're seeing
1832
1781
self._index.add_records(nodes, random_id=random_id)
1835
chunks = record.get_bytes_as('chunked')
1784
bytes = record.get_bytes_as('fulltext')
1836
1785
except errors.UnavailableRepresentation:
1837
adapter_key = record.storage_kind, 'chunked'
1786
adapter_key = record.storage_kind, 'fulltext'
1838
1787
adapter = get_adapter(adapter_key)
1839
chunks = adapter.get_bytes(record, 'chunked')
1840
chunks_len = record.size
1841
if chunks_len is None:
1842
chunks_len = sum(map(len, chunks))
1788
bytes = adapter.get_bytes(record)
1843
1789
if len(record.key) > 1:
1844
1790
prefix = record.key[0]
1845
1791
soft = (prefix == last_prefix)
1849
if max_fulltext_len < chunks_len:
1850
max_fulltext_len = chunks_len
1795
if max_fulltext_len < len(bytes):
1796
max_fulltext_len = len(bytes)
1851
1797
max_fulltext_prefix = prefix
1852
1798
(found_sha1, start_point, end_point,
1853
type) = self._compressor.compress(
1854
record.key, chunks, chunks_len, record.sha1, soft=soft,
1855
nostore_sha=nostore_sha)
1856
# delta_ratio = float(chunks_len) / (end_point - start_point)
1799
type) = self._compressor.compress(record.key,
1800
bytes, record.sha1, soft=soft,
1801
nostore_sha=nostore_sha)
1802
# delta_ratio = float(len(bytes)) / (end_point - start_point)
1857
1803
# Check if we want to continue to include that text
1858
1804
if (prefix == max_fulltext_prefix
1859
and end_point < 2 * max_fulltext_len):
1805
and end_point < 2 * max_fulltext_len):
1860
1806
# As long as we are on the same file_id, we will fill at least
1861
1807
# 2 * max_fulltext_len
1862
1808
start_new_block = False
1863
elif end_point > 4 * 1024 * 1024:
1809
elif end_point > 4*1024*1024:
1864
1810
start_new_block = True
1865
1811
elif (prefix is not None and prefix != last_prefix
1866
and end_point > 2 * 1024 * 1024):
1812
and end_point > 2*1024*1024):
1867
1813
start_new_block = True
1869
1815
start_new_block = False
1871
1817
if start_new_block:
1872
1818
self._compressor.pop_last()
1874
max_fulltext_len = chunks_len
1820
max_fulltext_len = len(bytes)
1875
1821
(found_sha1, start_point, end_point,
1876
type) = self._compressor.compress(
1877
record.key, chunks, chunks_len, record.sha1)
1822
type) = self._compressor.compress(record.key, bytes,
1878
1824
if record.key[-1] is None:
1879
key = record.key[:-1] + (b'sha1:' + found_sha1,)
1825
key = record.key[:-1] + ('sha1:' + found_sha1,)
1881
1827
key = record.key
1882
1828
self._unadded_refs[key] = record.parents
1883
yield found_sha1, chunks_len
1884
1830
as_st = static_tuple.StaticTuple.from_sequence
1885
1831
if record.parents is not None:
1886
1832
parents = as_st([as_st(p) for p in record.parents])
1889
1835
refs = static_tuple.StaticTuple(parents)
1891
(key, b'%d %d' % (start_point, end_point), refs))
1836
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1892
1837
if len(keys_to_add):
1894
1839
self._compressor = None
1920
1865
# but we need to setup a list of records to visit.
1921
1866
# we need key, position, length
1922
1867
for key_idx, record in enumerate(self.get_record_stream(keys,
1923
'unordered', True)):
1868
'unordered', True)):
1924
1869
# XXX: todo - optimise to use less than full texts.
1925
1870
key = record.key
1926
1871
if pb is not None:
1927
1872
pb.update('Walking content', key_idx, total)
1928
1873
if record.storage_kind == 'absent':
1929
1874
raise errors.RevisionNotPresent(key, self)
1930
for line in record.iter_bytes_as('lines'):
1875
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1931
1877
yield line, key
1932
1878
if pb is not None:
1933
1879
pb.update('Walking content', total, total)
1995
1941
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1997
1943
def __init__(self, graph_index, is_locked, parents=True,
1998
add_callback=None, track_external_parent_refs=False,
1999
inconsistency_fatal=True, track_new_keys=False):
1944
add_callback=None, track_external_parent_refs=False,
1945
inconsistency_fatal=True, track_new_keys=False):
2000
1946
"""Construct a _GCGraphIndex on a graph_index.
2002
:param graph_index: An implementation of breezy.index.GraphIndex.
1948
:param graph_index: An implementation of brzlib.index.GraphIndex.
2003
1949
:param is_locked: A callback, returns True if the index is locked and
2005
1951
:param parents: If True, record knits parents, if not do not record