24
from ..lazy_import import lazy_import
24
from bzrlib.lazy_import import lazy_import
25
25
lazy_import(globals(), """
30
31
graph as _mod_graph,
36
from breezy.bzr import (
42
from breezy.i18n import gettext
39
from bzrlib.repofmt import pack_repo
40
from bzrlib.i18n import gettext
48
from .btree_index import BTreeBuilder
49
from ..lru_cache import LRUSizeCache
50
from ..sixish import (
56
from .versionedfile import (
43
from bzrlib.btree_index import BTreeBuilder
44
from bzrlib.lru_cache import LRUSizeCache
45
from bzrlib.versionedfile import (
59
48
AbsentContentFactory,
69
58
# osutils.sha_string('')
70
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
72
61
def sort_gc_optimal(parent_map):
73
62
"""Sort and group the keys in parent_map into groupcompress order.
80
69
# groupcompress ordering is approximately reverse topological,
81
70
# properly grouped by file-id.
82
71
per_prefix_map = {}
83
for key, value in viewitems(parent_map):
84
if isinstance(key, bytes) or len(key) == 1:
72
for key, value in parent_map.iteritems():
73
if isinstance(key, str) or len(key) == 1:
96
85
return present_keys
99
class DecompressCorruption(errors.BzrError):
101
_fmt = "Corruption while decompressing repository file%(orig_error)s"
103
def __init__(self, orig_error=None):
104
if orig_error is not None:
105
self.orig_error = ", %s" % (orig_error,)
108
errors.BzrError.__init__(self)
111
88
# The max zlib window size is 32kB, so if we set 'max_size' output of the
112
89
# decompressor to the requested bytes + 32kB, then we should guarantee
113
90
# num_bytes coming out.
122
99
# Group Compress Block v1 Zlib
123
GCB_HEADER = b'gcb1z\n'
100
GCB_HEADER = 'gcb1z\n'
124
101
# Group Compress Block v1 Lzma
125
GCB_LZ_HEADER = b'gcb1l\n'
102
GCB_LZ_HEADER = 'gcb1l\n'
126
103
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
128
105
def __init__(self):
159
136
# Expand the content if required
160
137
if self._content is None:
161
138
if self._content_chunks is not None:
162
self._content = b''.join(self._content_chunks)
139
self._content = ''.join(self._content_chunks)
163
140
self._content_chunks = None
164
141
if self._content is None:
165
142
# We join self._z_content_chunks here, because if we are
168
145
if self._z_content_chunks is None:
169
146
raise AssertionError('No content to decompress')
170
z_content = b''.join(self._z_content_chunks)
147
z_content = ''.join(self._z_content_chunks)
173
150
elif self._compressor_name == 'lzma':
174
151
# We don't do partial lzma decomp yet
219
196
# The stream is finished
220
197
self._z_content_decompressor = None
222
def _parse_bytes(self, data, pos):
199
def _parse_bytes(self, bytes, pos):
223
200
"""Read the various lengths from the header.
225
202
This also populates the various 'compressed' buffers.
229
206
# At present, we have 2 integers for the compressed and uncompressed
230
207
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
231
208
# checking too far, cap the search to 14 bytes.
232
pos2 = data.index(b'\n', pos, pos + 14)
233
self._z_content_length = int(data[pos:pos2])
235
pos2 = data.index(b'\n', pos, pos + 14)
236
self._content_length = int(data[pos:pos2])
238
if len(data) != (pos + self._z_content_length):
209
pos2 = bytes.index('\n', pos, pos + 14)
210
self._z_content_length = int(bytes[pos:pos2])
212
pos2 = bytes.index('\n', pos, pos + 14)
213
self._content_length = int(bytes[pos:pos2])
215
if len(bytes) != (pos + self._z_content_length):
239
216
# XXX: Define some GCCorrupt error ?
240
217
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
241
(len(data), pos, self._z_content_length))
242
self._z_content_chunks = (data[pos:],)
218
(len(bytes), pos, self._z_content_length))
219
self._z_content_chunks = (bytes[pos:],)
245
222
def _z_content(self):
248
225
Meant only to be used by the test suite.
250
227
if self._z_content_chunks is not None:
251
return b''.join(self._z_content_chunks)
228
return ''.join(self._z_content_chunks)
255
232
def from_bytes(cls, bytes):
258
if header not in cls.GCB_KNOWN_HEADERS:
234
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
259
235
raise ValueError('bytes did not start with any of %r'
260
236
% (cls.GCB_KNOWN_HEADERS,))
261
if header == cls.GCB_HEADER:
237
# XXX: why not testing the whole header ?
262
239
out._compressor_name = 'zlib'
263
elif header == cls.GCB_LZ_HEADER:
240
elif bytes[4] == 'l':
264
241
out._compressor_name = 'lzma'
266
raise ValueError('unknown compressor: %r' % (header,))
243
raise ValueError('unknown compressor: %r' % (bytes,))
267
244
out._parse_bytes(bytes, 6)
275
252
:return: The bytes for the content
277
254
if start == end == 0:
279
256
self._ensure_content(end)
280
257
# The bytes are 'f' or 'd' for the type, then a variable-length
281
258
# base128 integer for the content size, then the actual content
282
259
# We know that the variable-length integer won't be longer than 5
283
260
# bytes (it takes 5 bytes to encode 2^32)
284
c = self._content[start:start + 1]
261
c = self._content[start]
286
263
type = 'fulltext'
289
266
raise ValueError('Unknown content control code: %s'
295
272
if end != content_start + content_len:
296
273
raise ValueError('end != len according to field header'
297
274
' %s != %s' % (end, content_start + content_len))
299
return self._content[content_start:end]
300
# Must be type delta as checked above
301
return apply_delta_to_source(self._content, content_start, end)
276
bytes = self._content[content_start:end]
278
bytes = apply_delta_to_source(self._content, content_start, end)
303
281
def set_chunked_content(self, content_chunks, length):
304
282
"""Set the content of this block to the given chunks."""
322
300
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
323
301
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
324
302
# (measured peak is maybe 30MB over the above...)
325
compressed_chunks = list(map(compressor.compress, chunks))
303
compressed_chunks = map(compressor.compress, chunks)
326
304
compressed_chunks.append(compressor.flush())
327
305
# Ignore empty chunks
328
306
self._z_content_chunks = [c for c in compressed_chunks if c]
341
319
"""Create the byte stream as a series of 'chunks'"""
342
320
self._create_z_content()
343
321
header = self.GCB_HEADER
344
chunks = [b'%s%d\n%d\n'
322
chunks = ['%s%d\n%d\n'
345
323
% (header, self._z_content_length, self._content_length),
347
325
chunks.extend(self._z_content_chunks)
351
329
def to_bytes(self):
352
330
"""Encode the information into a byte stream."""
353
331
total_len, chunks = self.to_chunks()
354
return b''.join(chunks)
332
return ''.join(chunks)
356
334
def _dump(self, include_text=False):
357
335
"""Take this block, and spit out a human-readable structure.
369
347
while pos < self._content_length:
370
kind = self._content[pos:pos + 1]
348
kind = self._content[pos]
372
if kind not in (b'f', b'd'):
350
if kind not in ('f', 'd'):
373
351
raise ValueError('invalid kind character: %r' % (kind,))
374
352
content_len, len_len = decode_base128_int(
375
353
self._content[pos:pos + 5])
377
355
if content_len + pos > self._content_length:
378
356
raise ValueError('invalid content_len %d for record @ pos %d'
379
357
% (content_len, pos - len_len - 1))
380
if kind == b'f': # Fulltext
358
if kind == 'f': # Fulltext
382
360
text = self._content[pos:pos+content_len]
383
result.append((b'f', content_len, text))
361
result.append(('f', content_len, text))
385
result.append((b'f', content_len))
386
elif kind == b'd': # Delta
363
result.append(('f', content_len))
364
elif kind == 'd': # Delta
387
365
delta_content = self._content[pos:pos+content_len]
389
367
# The first entry in a delta is the decompressed length
390
368
decomp_len, delta_pos = decode_base128_int(delta_content)
391
result.append((b'd', content_len, decomp_len, delta_info))
369
result.append(('d', content_len, decomp_len, delta_info))
393
371
while delta_pos < content_len:
394
c = indexbytes(delta_content, delta_pos)
372
c = ord(delta_content[delta_pos])
396
374
if c & 0x80: # Copy
401
379
text = self._content[offset:offset+length]
402
delta_info.append((b'c', offset, length, text))
380
delta_info.append(('c', offset, length, text))
404
delta_info.append((b'c', offset, length))
382
delta_info.append(('c', offset, length))
405
383
measured_len += length
408
386
txt = delta_content[delta_pos:delta_pos+c]
411
delta_info.append((b'i', c, txt))
389
delta_info.append(('i', c, txt))
412
390
measured_len += c
414
392
if delta_pos != content_len:
464
442
# wire bytes, something...
465
443
return self._manager._wire_bytes()
468
446
if storage_kind in ('fulltext', 'chunked'):
469
447
if self._bytes is None:
470
448
# Grab and cache the raw bytes for this entry
474
452
self._manager._prepare_for_extract()
475
453
except zlib.error as value:
476
raise DecompressCorruption("zlib: " + str(value))
454
raise errors.DecompressCorruption("zlib: " + str(value))
477
455
block = self._manager._block
478
456
self._bytes = block.extract(self.key, self._start, self._end)
479
457
# There are code paths that first extract as fulltext, and then
711
689
# <length of gc block>\n
714
lines = [b'groupcompress-block\n']
692
lines = ['groupcompress-block\n']
715
693
# The minimal info we need is the key, the start offset, and the
716
694
# parents. The length and type are encoded in the record itself.
717
695
# However, passing in the other bits makes it easier. The list of
722
700
# 1 line for end byte
723
701
header_lines = []
724
702
for factory in self._factories:
725
key_bytes = b'\x00'.join(factory.key)
703
key_bytes = '\x00'.join(factory.key)
726
704
parents = factory.parents
727
705
if parents is None:
728
parent_bytes = b'None:'
706
parent_bytes = 'None:'
730
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
731
record_header = b'%s\n%s\n%d\n%d\n' % (
708
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
709
record_header = '%s\n%s\n%d\n%d\n' % (
732
710
key_bytes, parent_bytes, factory._start, factory._end)
733
711
header_lines.append(record_header)
734
712
# TODO: Can we break the refcycle at this point and set
735
713
# factory._manager = None?
736
header_bytes = b''.join(header_lines)
714
header_bytes = ''.join(header_lines)
738
716
header_bytes_len = len(header_bytes)
739
717
z_header_bytes = zlib.compress(header_bytes)
741
719
z_header_bytes_len = len(z_header_bytes)
742
720
block_bytes_len, block_chunks = self._block.to_chunks()
743
lines.append(b'%d\n%d\n%d\n' % (
744
z_header_bytes_len, header_bytes_len, block_bytes_len))
721
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
745
723
lines.append(z_header_bytes)
746
724
lines.extend(block_chunks)
747
725
del z_header_bytes, block_chunks
748
726
# TODO: This is a point where we will double the memory consumption. To
749
727
# avoid this, we probably have to switch to a 'chunked' api
750
return b''.join(lines)
728
return ''.join(lines)
753
731
def from_bytes(cls, bytes):
755
733
# different way. At a minimum this creates 2 copies of the
756
734
# compressed content
757
735
(storage_kind, z_header_len, header_len,
758
block_len, rest) = bytes.split(b'\n', 4)
736
block_len, rest) = bytes.split('\n', 4)
760
if storage_kind != b'groupcompress-block':
738
if storage_kind != 'groupcompress-block':
761
739
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
762
740
z_header_len = int(z_header_len)
763
741
if len(rest) < z_header_len:
776
754
# So now we have a valid GCB, we just need to parse the factories that
777
755
# were sent to us
778
header_lines = header.split(b'\n')
756
header_lines = header.split('\n')
780
758
last = header_lines.pop()
782
760
raise ValueError('header lines did not end with a trailing'
784
762
if len(header_lines) % 4 != 0:
786
764
block = GroupCompressBlock.from_bytes(block_bytes)
788
766
result = cls(block)
789
for start in range(0, len(header_lines), 4):
767
for start in xrange(0, len(header_lines), 4):
791
key = tuple(header_lines[start].split(b'\x00'))
769
key = tuple(header_lines[start].split('\x00'))
792
770
parents_line = header_lines[start+1]
793
if parents_line == b'None:':
771
if parents_line == 'None:':
796
parents = tuple([tuple(segment.split(b'\x00'))
797
for segment in parents_line.split(b'\t')
774
parents = tuple([tuple(segment.split('\x00'))
775
for segment in parents_line.split('\t')
799
777
start_offset = int(header_lines[start+2])
800
778
end_offset = int(header_lines[start+3])
859
837
if sha1 == nostore_sha:
860
838
raise errors.ExistingContent()
861
839
if key[-1] is None:
862
key = key[:-1] + (b'sha1:' + sha1,)
840
key = key[:-1] + ('sha1:' + sha1,)
864
842
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
865
843
return sha1, start, end, type
892
870
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
893
871
delta_chunks = self.chunks[start_chunk:end_chunk]
894
stored_bytes = b''.join(delta_chunks)
895
kind = stored_bytes[:1]
872
stored_bytes = ''.join(delta_chunks)
873
if stored_bytes[0] == 'f':
897
874
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
898
875
data_len = fulltext_len + 1 + offset
899
876
if data_len != len(stored_bytes):
900
877
raise ValueError('Index claimed fulltext len, but stored bytes'
901
878
' claim %s != %s'
902
879
% (len(stored_bytes), data_len))
903
data = stored_bytes[offset + 1:]
880
bytes = stored_bytes[offset + 1:]
906
raise ValueError('Unknown content kind, bytes claim %s' % kind)
907
882
# XXX: This is inefficient at best
908
source = b''.join(self.chunks[:start_chunk])
883
source = ''.join(self.chunks[:start_chunk])
884
if stored_bytes[0] != 'd':
885
raise ValueError('Unknown content kind, bytes claim %s'
886
% (stored_bytes[0],))
909
887
delta_len, offset = decode_base128_int(stored_bytes[1:10])
910
888
data_len = delta_len + 1 + offset
911
889
if data_len != len(stored_bytes):
912
890
raise ValueError('Index claimed delta len, but stored bytes'
913
891
' claim %s != %s'
914
892
% (len(stored_bytes), data_len))
915
data = apply_delta(source, stored_bytes[offset + 1:])
916
data_sha1 = osutils.sha_string(data)
917
return data, data_sha1
893
bytes = apply_delta(source, stored_bytes[offset + 1:])
894
bytes_sha1 = osutils.sha_string(bytes)
895
return bytes, bytes_sha1
920
898
"""Finish this group, creating a formatted stream.
964
942
if delta_length > max_delta_size:
965
943
# The delta is longer than the fulltext, insert a fulltext
966
944
type = 'fulltext'
967
out_lines = [b'f', encode_base128_int(input_len)]
945
out_lines = ['f', encode_base128_int(input_len)]
968
946
out_lines.extend(new_lines)
969
947
index_lines = [False, False]
970
948
index_lines.extend([True] * len(new_lines))
972
950
# this is a worthy delta, output it
975
953
# Update the delta_length to include those two encoded integers
976
954
out_lines[1] = encode_base128_int(delta_length)
977
955
# Before insertion
1031
1009
enc_length = encode_base128_int(len(bytes))
1032
1010
len_mini_header = 1 + len(enc_length)
1033
1011
self._delta_index.add_source(bytes, len_mini_header)
1034
new_chunks = [b'f', enc_length, bytes]
1012
new_chunks = ['f', enc_length, bytes]
1037
1015
enc_length = encode_base128_int(len(delta))
1038
1016
len_mini_header = 1 + len(enc_length)
1039
new_chunks = [b'd', enc_length, delta]
1017
new_chunks = ['d', enc_length, delta]
1040
1018
self._delta_index.add_delta_source(delta, len_mini_header)
1041
1019
# Before insertion
1042
1020
start = self.endpoint
1189
1167
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1190
1168
# The next block from _get_blocks will be the block we
1192
block_read_memo, block = next(blocks)
1170
block_read_memo, block = blocks.next()
1193
1171
if block_read_memo != read_memo:
1194
1172
raise AssertionError(
1195
1173
"block_read_memo out of sync with read_memo"
1305
1283
nostore_sha=nostore_sha))[0]
1306
1284
return sha1, length, None
1286
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1287
"""See VersionedFiles._add_text()."""
1288
self._index._check_write_ok()
1289
self._check_add(key, None, random_id, check_content=False)
1290
if text.__class__ is not str:
1291
raise errors.BzrBadParameterUnicode("text")
1293
# The caller might pass None if there is no graph data, but kndx
1294
# indexes can't directly store that, so we give them
1295
# an empty tuple instead.
1297
# double handling for now. Make it work until then.
1299
record = FulltextContentFactory(key, parents, None, text)
1300
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1301
nostore_sha=nostore_sha))[0]
1302
return sha1, length, None
1308
1304
def add_fallback_versioned_files(self, a_versioned_files):
1309
1305
"""Add a source of texts for texts not present in this knit.
1413
1409
yield read_memo, cached[read_memo]
1414
1410
except KeyError:
1415
1411
# Read the block, and cache it.
1416
zdata = next(raw_records)
1412
zdata = raw_records.next()
1417
1413
block = GroupCompressBlock.from_bytes(zdata)
1418
1414
self._group_cache[read_memo] = block
1419
1415
cached[read_memo] = block
1461
1457
remaining_keys.discard(content_factory.key)
1462
1458
yield content_factory
1464
except errors.RetryWithNewPacks as e:
1460
except errors.RetryWithNewPacks, e:
1465
1461
self._access.reload_or_raise(e)
1467
1463
def _find_from_fallback(self, missing):
1541
1537
# This is the group the bytes are stored in, followed by the
1542
1538
# location in the group
1543
1539
return locations[key][0]
1540
present_keys = sorted(locations.iterkeys(), key=get_group)
1544
1541
# We don't have an ordering for keys in the in-memory object, but
1545
1542
# lets process the in-memory ones first.
1546
present_keys = list(unadded_keys)
1547
present_keys.extend(sorted(locations, key=get_group))
1543
present_keys = list(unadded_keys) + present_keys
1548
1544
# Now grab all of the ones from other sources
1549
1545
source_keys = [(self, present_keys)]
1550
1546
source_keys.extend(source_result)
1574
1570
# start with one key, recurse to its oldest parent, then grab
1575
1571
# everything in the same group, etc.
1576
1572
parent_map = dict((key, details[2]) for key, details in
1577
viewitems(locations))
1573
locations.iteritems())
1578
1574
for key in unadded_keys:
1579
1575
parent_map[key] = self._unadded_refs[key]
1580
1576
parent_map.update(fallback_parent_map)
1714
1710
# the fulltext content at this point. Note that sometimes we
1715
1711
# will want it later (streaming CHK pages), but most of the
1716
1712
# time we won't (everything else)
1717
data = b''.join(chunks)
1713
bytes = ''.join(chunks)
1719
1715
index, start, length = self._access.add_raw_records(
1720
[(None, len(data))], data)[0]
1716
[(None, len(bytes))], bytes)[0]
1722
1718
for key, reads, refs in keys_to_add:
1723
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1719
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1724
1720
self._index.add_records(nodes, random_id=random_id)
1725
1721
self._unadded_refs = {}
1726
1722
del keys_to_add[:]
1776
1772
' the current record, we cannot be positive'
1777
1773
' that the appropriate content was inserted.'
1779
value = b"%d %d %d %d" % (block_start, block_length,
1775
value = "%d %d %d %d" % (block_start, block_length,
1780
1776
record._start, record._end)
1781
1777
nodes = [(record.key, value, (record.parents,))]
1782
1778
# TODO: Consider buffering up many nodes to be added, not
1826
1822
type) = self._compressor.compress(record.key, bytes,
1828
1824
if record.key[-1] is None:
1829
key = record.key[:-1] + (b'sha1:' + found_sha1,)
1825
key = record.key[:-1] + ('sha1:' + found_sha1,)
1831
1827
key = record.key
1832
1828
self._unadded_refs[key] = record.parents
1839
1835
refs = static_tuple.StaticTuple(parents)
1841
(key, b'%d %d' % (start_point, end_point), refs))
1836
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1842
1837
if len(keys_to_add):
1844
1839
self._compressor = None
1950
1945
inconsistency_fatal=True, track_new_keys=False):
1951
1946
"""Construct a _GCGraphIndex on a graph_index.
1953
:param graph_index: An implementation of breezy.index.GraphIndex.
1948
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1954
1949
:param is_locked: A callback, returns True if the index is locked and
1956
1951
:param parents: If True, record knits parents, if not do not record
2022
2017
if node_refs != passed[1]:
2023
2018
details = '%s %s %s' % (key, (value, node_refs), passed)
2024
2019
if self._inconsistency_fatal:
2025
raise knit.KnitCorrupt(self, "inconsistent details"
2020
raise errors.KnitCorrupt(self, "inconsistent details"
2026
2021
" in add_records: %s" %
2035
2030
if self._parents:
2036
for key, (value, node_refs) in viewitems(keys):
2031
for key, (value, node_refs) in keys.iteritems():
2037
2032
result.append((key, value, node_refs))
2039
for key, (value, node_refs) in viewitems(keys):
2034
for key, (value, node_refs) in keys.iteritems():
2040
2035
result.append((key, value))
2041
2036
records = result
2042
2037
key_dependencies = self._key_dependencies
2155
2150
def _node_to_position(self, node):
2156
2151
"""Convert an index value to position details."""
2157
bits = node[2].split(b' ')
2152
bits = node[2].split(' ')
2158
2153
# It would be nice not to read the entire gzip.
2159
2154
# start and stop are put into _int_cache because they are very common.
2160
2155
# They define the 'group' that an entry is in, and many groups can have