123
126
DATA_SUFFIX = '.knit'
124
127
INDEX_SUFFIX = '.kndx'
125
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
128
class KnitError(InternalBzrError):
133
class KnitCorrupt(KnitError):
135
_fmt = "Knit %(filename)s corrupt: %(how)s"
137
def __init__(self, filename, how):
138
KnitError.__init__(self)
139
self.filename = filename
143
class SHA1KnitCorrupt(KnitCorrupt):
145
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
146
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
149
def __init__(self, filename, actual, expected, key, content):
150
KnitError.__init__(self)
151
self.filename = filename
153
self.expected = expected
155
self.content = content
158
class KnitDataStreamIncompatible(KnitError):
159
# Not raised anymore, as we can convert data streams. In future we may
160
# need it again for more exotic cases, so we're keeping it around for now.
162
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
164
def __init__(self, stream_format, target_format):
165
self.stream_format = stream_format
166
self.target_format = target_format
169
class KnitDataStreamUnknown(KnitError):
170
# Indicates a data stream we don't know how to handle.
172
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
174
def __init__(self, stream_format):
175
self.stream_format = stream_format
178
class KnitHeaderError(KnitError):
180
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
182
def __init__(self, badline, filename):
183
KnitError.__init__(self)
184
self.badline = badline
185
self.filename = filename
188
class KnitIndexUnknownMethod(KnitError):
189
"""Raised when we don't understand the storage method.
191
Currently only 'fulltext' and 'line-delta' are supported.
194
_fmt = ("Knit index %(filename)s does not have a known method"
195
" in options: %(options)r")
197
def __init__(self, filename, options):
198
KnitError.__init__(self)
199
self.filename = filename
200
self.options = options
128
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
203
131
class KnitAdapter(object):
218
146
class FTAnnotatedToUnannotated(KnitAdapter):
219
147
"""An adapter from FT annotated knits to unannotated ones."""
221
def get_bytes(self, factory, target_storage_kind):
222
if target_storage_kind != 'knit-ft-gz':
223
raise UnavailableRepresentation(
224
factory.key, target_storage_kind, factory.storage_kind)
149
def get_bytes(self, factory):
225
150
annotated_compressed_bytes = factory._raw_record
226
151
rec, contents = \
227
152
self._data._parse_record_unchecked(annotated_compressed_bytes)
228
153
content = self._annotate_factory.parse_fulltext(contents, rec[1])
229
size, chunks = self._data._record_to_data(
230
(rec[1],), rec[3], content.text())
231
return b''.join(chunks)
154
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
234
158
class DeltaAnnotatedToUnannotated(KnitAdapter):
235
159
"""An adapter for deltas from annotated to unannotated."""
237
def get_bytes(self, factory, target_storage_kind):
238
if target_storage_kind != 'knit-delta-gz':
239
raise UnavailableRepresentation(
240
factory.key, target_storage_kind, factory.storage_kind)
161
def get_bytes(self, factory):
241
162
annotated_compressed_bytes = factory._raw_record
242
163
rec, contents = \
243
164
self._data._parse_record_unchecked(annotated_compressed_bytes)
244
165
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
246
167
contents = self._plain_factory.lower_line_delta(delta)
247
size, chunks = self._data._record_to_data((rec[1],), rec[3], contents)
248
return b''.join(chunks)
168
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
251
172
class FTAnnotatedToFullText(KnitAdapter):
252
173
"""An adapter from FT annotated knits to unannotated ones."""
254
def get_bytes(self, factory, target_storage_kind):
175
def get_bytes(self, factory):
255
176
annotated_compressed_bytes = factory._raw_record
256
177
rec, contents = \
257
178
self._data._parse_record_unchecked(annotated_compressed_bytes)
258
179
content, delta = self._annotate_factory.parse_record(factory.key[-1],
259
contents, factory._build_details, None)
260
if target_storage_kind == 'fulltext':
261
return b''.join(content.text())
262
elif target_storage_kind in ('chunked', 'lines'):
263
return content.text()
264
raise UnavailableRepresentation(
265
factory.key, target_storage_kind, factory.storage_kind)
180
contents, factory._build_details, None)
181
return ''.join(content.text())
268
184
class DeltaAnnotatedToFullText(KnitAdapter):
269
185
"""An adapter for deltas from annotated to unannotated."""
271
def get_bytes(self, factory, target_storage_kind):
187
def get_bytes(self, factory):
272
188
annotated_compressed_bytes = factory._raw_record
273
189
rec, contents = \
274
190
self._data._parse_record_unchecked(annotated_compressed_bytes)
275
191
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
277
193
compression_parent = factory.parents[0]
278
basis_entry = next(self._basis_vf.get_record_stream(
279
[compression_parent], 'unordered', True))
194
basis_entry = self._basis_vf.get_record_stream(
195
[compression_parent], 'unordered', True).next()
280
196
if basis_entry.storage_kind == 'absent':
281
197
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
282
basis_lines = basis_entry.get_bytes_as('lines')
198
basis_chunks = basis_entry.get_bytes_as('chunked')
199
basis_lines = osutils.chunks_to_lines(basis_chunks)
283
200
# Manually apply the delta because we have one annotated content and
285
202
basis_content = PlainKnitContent(basis_lines, compression_parent)
286
203
basis_content.apply_delta(delta, rec[1])
287
204
basis_content._should_strip_eol = factory._build_details[1]
289
if target_storage_kind == 'fulltext':
290
return b''.join(basis_content.text())
291
elif target_storage_kind in ('chunked', 'lines'):
292
return basis_content.text()
293
raise UnavailableRepresentation(
294
factory.key, target_storage_kind, factory.storage_kind)
205
return ''.join(basis_content.text())
297
208
class FTPlainToFullText(KnitAdapter):
298
209
"""An adapter from FT plain knits to unannotated ones."""
300
def get_bytes(self, factory, target_storage_kind):
211
def get_bytes(self, factory):
301
212
compressed_bytes = factory._raw_record
302
213
rec, contents = \
303
214
self._data._parse_record_unchecked(compressed_bytes)
304
215
content, delta = self._plain_factory.parse_record(factory.key[-1],
305
contents, factory._build_details, None)
306
if target_storage_kind == 'fulltext':
307
return b''.join(content.text())
308
elif target_storage_kind in ('chunked', 'lines'):
309
return content.text()
310
raise UnavailableRepresentation(
311
factory.key, target_storage_kind, factory.storage_kind)
216
contents, factory._build_details, None)
217
return ''.join(content.text())
314
220
class DeltaPlainToFullText(KnitAdapter):
315
221
"""An adapter for deltas from annotated to unannotated."""
317
def get_bytes(self, factory, target_storage_kind):
223
def get_bytes(self, factory):
318
224
compressed_bytes = factory._raw_record
319
225
rec, contents = \
320
226
self._data._parse_record_unchecked(compressed_bytes)
321
227
delta = self._plain_factory.parse_line_delta(contents, rec[1])
322
228
compression_parent = factory.parents[0]
323
229
# XXX: string splitting overhead.
324
basis_entry = next(self._basis_vf.get_record_stream(
325
[compression_parent], 'unordered', True))
230
basis_entry = self._basis_vf.get_record_stream(
231
[compression_parent], 'unordered', True).next()
326
232
if basis_entry.storage_kind == 'absent':
327
233
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
328
basis_lines = basis_entry.get_bytes_as('lines')
234
basis_chunks = basis_entry.get_bytes_as('chunked')
235
basis_lines = osutils.chunks_to_lines(basis_chunks)
329
236
basis_content = PlainKnitContent(basis_lines, compression_parent)
330
237
# Manually apply the delta because we have one annotated content and
332
239
content, _ = self._plain_factory.parse_record(rec[1], contents,
333
factory._build_details, basis_content)
334
if target_storage_kind == 'fulltext':
335
return b''.join(content.text())
336
elif target_storage_kind in ('chunked', 'lines'):
337
return content.text()
338
raise UnavailableRepresentation(
339
factory.key, target_storage_kind, factory.storage_kind)
240
factory._build_details, basis_content)
241
return ''.join(content.text())
342
244
class KnitContentFactory(ContentFactory):
400
300
if self._network_bytes is None:
401
301
self._create_network_bytes()
402
302
return self._network_bytes
403
if ('-ft-' in self.storage_kind
404
and storage_kind in ('chunked', 'fulltext', 'lines')):
405
adapter_key = (self.storage_kind, storage_kind)
303
if ('-ft-' in self.storage_kind and
304
storage_kind in ('chunked', 'fulltext')):
305
adapter_key = (self.storage_kind, 'fulltext')
406
306
adapter_factory = adapter_registry.get(adapter_key)
407
307
adapter = adapter_factory(None)
408
return adapter.get_bytes(self, storage_kind)
308
bytes = adapter.get_bytes(self)
309
if storage_kind == 'chunked':
409
313
if self._knit is not None:
410
314
# Not redundant with direct conversion above - that only handles
411
315
# fulltext cases.
412
if storage_kind in ('chunked', 'lines'):
316
if storage_kind == 'chunked':
413
317
return self._knit.get_lines(self.key[0])
414
318
elif storage_kind == 'fulltext':
415
319
return self._knit.get_text(self.key[0])
416
raise UnavailableRepresentation(self.key, storage_kind,
419
def iter_bytes_as(self, storage_kind):
420
return iter(self.get_bytes_as(storage_kind))
320
raise errors.UnavailableRepresentation(self.key, storage_kind,
423
324
class LazyKnitContentFactory(ContentFactory):
735
631
# loop to minimise any performance impact
737
633
for header in lines:
738
start, end, count = [int(n) for n in header.split(b',')]
739
contents = [next(lines).split(b' ', 1)[1]
740
for _ in range(count)]
634
start, end, count = [int(n) for n in header.split(',')]
635
contents = [next().split(' ', 1)[1] for i in xrange(count)]
741
636
result.append((start, end, count, contents))
743
638
for header in lines:
744
start, end, count = [int(n) for n in header.split(b',')]
745
contents = [tuple(next(lines).split(b' ', 1))
746
for _ in range(count)]
639
start, end, count = [int(n) for n in header.split(',')]
640
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
747
641
result.append((start, end, count, contents))
750
644
def get_fulltext_content(self, lines):
751
645
"""Extract just the content lines from a fulltext."""
752
return (line.split(b' ', 1)[1] for line in lines)
646
return (line.split(' ', 1)[1] for line in lines)
754
648
def get_linedelta_content(self, lines):
755
649
"""Extract just the content from a line delta.
1018
914
# indexes can't directly store that, so we give them
1019
915
# an empty tuple instead.
1021
line_bytes = b''.join(lines)
917
line_bytes = ''.join(lines)
1022
918
return self._add(key, lines, parents,
1023
parent_texts, left_matching_blocks, nostore_sha, random_id,
1024
line_bytes=line_bytes)
919
parent_texts, left_matching_blocks, nostore_sha, random_id,
920
line_bytes=line_bytes)
1026
def add_content(self, content_factory, parent_texts=None,
1027
left_matching_blocks=None, nostore_sha=None,
1029
"""See VersionedFiles.add_content()."""
922
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
923
"""See VersionedFiles._add_text()."""
1030
924
self._index._check_write_ok()
1031
key = content_factory.key
1032
parents = content_factory.parents
1033
925
self._check_add(key, None, random_id, check_content=False)
926
if text.__class__ is not str:
927
raise errors.BzrBadParameterUnicode("text")
1034
928
if parents is None:
1035
929
# The caller might pass None if there is no graph data, but kndx
1036
930
# indexes can't directly store that, so we give them
1037
931
# an empty tuple instead.
1039
lines = content_factory.get_bytes_as('lines')
1040
line_bytes = content_factory.get_bytes_as('fulltext')
1041
return self._add(key, lines, parents,
1042
parent_texts, left_matching_blocks, nostore_sha, random_id,
1043
line_bytes=line_bytes)
933
return self._add(key, None, parents,
934
None, None, nostore_sha, random_id,
1045
937
def _add(self, key, lines, parents, parent_texts,
1046
left_matching_blocks, nostore_sha, random_id,
938
left_matching_blocks, nostore_sha, random_id,
1048
940
"""Add a set of lines on top of version specified by parents.
1050
942
Any versions not present will be converted into ghosts.
1121
1013
if delta or (self._factory.annotated and len(present_parents) > 0):
1122
1014
# Merge annotations from parent texts if needed.
1123
1015
delta_hunks = self._merge_annotations(content, present_parents,
1124
parent_texts, delta, self._factory.annotated,
1125
left_matching_blocks)
1016
parent_texts, delta, self._factory.annotated,
1017
left_matching_blocks)
1128
options.append(b'line-delta')
1020
options.append('line-delta')
1129
1021
store_lines = self._factory.lower_line_delta(delta_hunks)
1130
size, data = self._record_to_data(key, digest, store_lines)
1022
size, bytes = self._record_to_data(key, digest,
1132
options.append(b'fulltext')
1025
options.append('fulltext')
1133
1026
# isinstance is slower and we have no hierarchy.
1134
1027
if self._factory.__class__ is KnitPlainFactory:
1135
1028
# Use the already joined bytes saving iteration time in
1136
1029
# _record_to_data.
1137
1030
dense_lines = [line_bytes]
1139
dense_lines.append(b'\n')
1140
size, data = self._record_to_data(key, digest,
1032
dense_lines.append('\n')
1033
size, bytes = self._record_to_data(key, digest,
1143
1036
# get mixed annotation + content and feed it into the
1145
1038
store_lines = self._factory.lower_fulltext(content)
1146
size, data = self._record_to_data(key, digest, store_lines)
1039
size, bytes = self._record_to_data(key, digest,
1148
access_memo = self._access.add_raw_record(key, size, data)
1042
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1149
1043
self._index.add_records(
1150
1044
((key, options, access_memo, parents),),
1151
1045
random_id=random_id)
1504
1396
keys = set(remaining_keys)
1505
1397
for content_factory in self._get_remaining_record_stream(keys,
1506
ordering, include_delta_closure):
1398
ordering, include_delta_closure):
1507
1399
remaining_keys.discard(content_factory.key)
1508
1400
yield content_factory
1510
except errors.RetryWithNewPacks as e:
1402
except errors.RetryWithNewPacks, e:
1511
1403
self._access.reload_or_raise(e)
1513
1405
def _get_remaining_record_stream(self, keys, ordering,
1514
1406
include_delta_closure):
1515
1407
"""This function is the 'retry' portion for get_record_stream."""
1516
1408
if include_delta_closure:
1517
positions = self._get_components_positions(
1518
keys, allow_missing=True)
1409
positions = self._get_components_positions(keys, allow_missing=True)
1520
1411
build_details = self._index.get_build_details(keys)
1521
1412
# map from key to
1522
1413
# (record_details, access_memo, compression_parent_key)
1523
1414
positions = dict((key, self._build_details_to_components(details))
1524
for key, details in build_details.items())
1415
for key, details in build_details.iteritems())
1525
1416
absent_keys = keys.difference(set(positions))
1526
1417
# There may be more absent keys : if we're missing the basis component
1527
1418
# and are trying to include the delta closure.
1996
1885
# 4168 calls in 2880 217 internal
1997
1886
# 4168 calls to _parse_record_header in 2121
1998
1887
# 4168 calls to readlines in 330
1999
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
2001
record_contents = df.readlines()
2002
except Exception as e:
2003
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
2004
(data, e.__class__.__name__, str(e)))
2005
header = record_contents.pop(0)
2006
rec = self._split_header(header)
2007
last_line = record_contents.pop()
2008
if len(record_contents) != int(rec[2]):
2009
raise KnitCorrupt(self,
2010
'incorrect number of lines %s != %s'
2011
' for version {%s} %s'
2012
% (len(record_contents), int(rec[2]),
2013
rec[1], record_contents))
2014
if last_line != b'end %s\n' % rec[1]:
2015
raise KnitCorrupt(self,
2016
'unexpected version end line %r, wanted %r'
2017
% (last_line, rec[1]))
1888
df = gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1890
record_contents = df.readlines()
1891
except Exception, e:
1892
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1893
(data, e.__class__.__name__, str(e)))
1894
header = record_contents.pop(0)
1895
rec = self._split_header(header)
1896
last_line = record_contents.pop()
1897
if len(record_contents) != int(rec[2]):
1898
raise KnitCorrupt(self,
1899
'incorrect number of lines %s != %s'
1900
' for version {%s} %s'
1901
% (len(record_contents), int(rec[2]),
1902
rec[1], record_contents))
1903
if last_line != 'end %s\n' % rec[1]:
1904
raise KnitCorrupt(self,
1905
'unexpected version end line %r, wanted %r'
1906
% (last_line, rec[1]))
2018
1908
return rec, record_contents
2020
1910
def _read_records_iter(self, records):
2276
2167
# one line with next ('' for None)
2277
2168
# one line with byte count of the record bytes
2278
2169
# the record bytes
2279
for key, (record_bytes, (method, noeol), next) in (
2280
self._raw_record_map.items()):
2281
key_bytes = b'\x00'.join(key)
2170
for key, (record_bytes, (method, noeol), next) in \
2171
self._raw_record_map.iteritems():
2172
key_bytes = '\x00'.join(key)
2282
2173
parents = self.global_map.get(key, None)
2283
2174
if parents is None:
2284
parent_bytes = b'None:'
2175
parent_bytes = 'None:'
2286
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2287
method_bytes = method.encode('ascii')
2177
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2178
method_bytes = method
2293
next_bytes = b'\x00'.join(next)
2184
next_bytes = '\x00'.join(next)
2296
map_byte_list.append(b'\n'.join(
2297
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2298
b'%d' % len(record_bytes), record_bytes]))
2299
map_bytes = b''.join(map_byte_list)
2187
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2188
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2189
len(record_bytes), record_bytes))
2190
map_bytes = ''.join(map_byte_list)
2300
2191
lines.append(map_bytes)
2301
bytes = b'\n'.join(lines)
2192
bytes = '\n'.join(lines)
2383
2274
end = len(bytes)
2384
2275
while start < end:
2385
2276
# 1 line with key
2386
line_end = bytes.find(b'\n', start)
2387
key = tuple(bytes[start:line_end].split(b'\x00'))
2277
line_end = bytes.find('\n', start)
2278
key = tuple(bytes[start:line_end].split('\x00'))
2388
2279
start = line_end + 1
2389
2280
# 1 line with parents (None: for None, '' for ())
2390
line_end = bytes.find(b'\n', start)
2281
line_end = bytes.find('\n', start)
2391
2282
line = bytes[start:line_end]
2392
if line == b'None:':
2395
2286
parents = tuple(
2396
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2287
[tuple(segment.split('\x00')) for segment in line.split('\t')
2398
2289
self.global_map[key] = parents
2399
2290
start = line_end + 1
2400
2291
# one line with method
2401
line_end = bytes.find(b'\n', start)
2292
line_end = bytes.find('\n', start)
2402
2293
line = bytes[start:line_end]
2403
method = line.decode('ascii')
2404
2295
start = line_end + 1
2405
2296
# one line with noeol
2406
line_end = bytes.find(b'\n', start)
2297
line_end = bytes.find('\n', start)
2407
2298
line = bytes[start:line_end]
2408
noeol = line == b"T"
2409
2300
start = line_end + 1
2410
# one line with next (b'' for None)
2411
line_end = bytes.find(b'\n', start)
2301
# one line with next ('' for None)
2302
line_end = bytes.find('\n', start)
2412
2303
line = bytes[start:line_end]
2416
next = tuple(bytes[start:line_end].split(b'\x00'))
2307
next = tuple(bytes[start:line_end].split('\x00'))
2417
2308
start = line_end + 1
2418
2309
# one line with byte count of the record bytes
2419
line_end = bytes.find(b'\n', start)
2310
line_end = bytes.find('\n', start)
2420
2311
line = bytes[start:line_end]
2421
2312
count = int(line)
2422
2313
start = line_end + 1
2423
2314
# the record bytes
2424
record_bytes = bytes[start:start + count]
2315
record_bytes = bytes[start:start+count]
2425
2316
start = start + count
2426
2317
# put it in the map
2427
2318
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
3456
3326
records, ann_keys = self._get_build_graph(key)
3457
3327
for idx, (sub_key, text, num_lines) in enumerate(
3458
self._extract_texts(records)):
3328
self._extract_texts(records)):
3459
3329
if pb is not None:
3460
3330
pb.update(gettext('annotating'), idx, len(records))
3461
3331
yield sub_key, text, num_lines
3462
3332
for sub_key in ann_keys:
3463
3333
text = self._text_cache[sub_key]
3464
num_lines = len(text) # bad assumption
3334
num_lines = len(text) # bad assumption
3465
3335
yield sub_key, text, num_lines
3467
except errors.RetryWithNewPacks as e:
3337
except errors.RetryWithNewPacks, e:
3468
3338
self._vf._access.reload_or_raise(e)
3469
3339
# The cached build_details are no longer valid
3470
3340
self._all_build_details.clear()
3472
3342
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3473
3343
parent_lines = self._text_cache[compression_parent]
3474
blocks = list(KnitContent.get_line_delta_blocks(
3475
delta, parent_lines, lines))
3344
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3476
3345
self._matching_blocks[(key, compression_parent)] = blocks
3478
3347
def _expand_record(self, key, parent_keys, compression_parent, record,