121
126
DATA_SUFFIX = '.knit'
122
127
INDEX_SUFFIX = '.kndx'
123
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
126
class KnitError(InternalBzrError):
131
class KnitCorrupt(KnitError):
133
_fmt = "Knit %(filename)s corrupt: %(how)s"
135
def __init__(self, filename, how):
136
KnitError.__init__(self)
137
self.filename = filename
141
class SHA1KnitCorrupt(KnitCorrupt):
143
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
144
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
147
def __init__(self, filename, actual, expected, key, content):
148
KnitError.__init__(self)
149
self.filename = filename
151
self.expected = expected
153
self.content = content
156
class KnitDataStreamIncompatible(KnitError):
157
# Not raised anymore, as we can convert data streams. In future we may
158
# need it again for more exotic cases, so we're keeping it around for now.
160
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
162
def __init__(self, stream_format, target_format):
163
self.stream_format = stream_format
164
self.target_format = target_format
167
class KnitDataStreamUnknown(KnitError):
168
# Indicates a data stream we don't know how to handle.
170
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
172
def __init__(self, stream_format):
173
self.stream_format = stream_format
176
class KnitHeaderError(KnitError):
178
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
180
def __init__(self, badline, filename):
181
KnitError.__init__(self)
182
self.badline = badline
183
self.filename = filename
186
class KnitIndexUnknownMethod(KnitError):
187
"""Raised when we don't understand the storage method.
189
Currently only 'fulltext' and 'line-delta' are supported.
192
_fmt = ("Knit index %(filename)s does not have a known method"
193
" in options: %(options)r")
195
def __init__(self, filename, options):
196
KnitError.__init__(self)
197
self.filename = filename
198
self.options = options
128
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
201
131
class KnitAdapter(object):
216
146
class FTAnnotatedToUnannotated(KnitAdapter):
217
147
"""An adapter from FT annotated knits to unannotated ones."""
219
def get_bytes(self, factory, target_storage_kind):
220
if target_storage_kind != 'knit-ft-gz':
221
raise errors.UnavailableRepresentation(
222
factory.key, target_storage_kind, factory.storage_kind)
149
def get_bytes(self, factory):
223
150
annotated_compressed_bytes = factory._raw_record
224
151
rec, contents = \
225
152
self._data._parse_record_unchecked(annotated_compressed_bytes)
226
153
content = self._annotate_factory.parse_fulltext(contents, rec[1])
227
size, chunks = self._data._record_to_data(
228
(rec[1],), rec[3], content.text())
229
return b''.join(chunks)
154
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
232
158
class DeltaAnnotatedToUnannotated(KnitAdapter):
233
159
"""An adapter for deltas from annotated to unannotated."""
235
def get_bytes(self, factory, target_storage_kind):
236
if target_storage_kind != 'knit-delta-gz':
237
raise errors.UnavailableRepresentation(
238
factory.key, target_storage_kind, factory.storage_kind)
161
def get_bytes(self, factory):
239
162
annotated_compressed_bytes = factory._raw_record
240
163
rec, contents = \
241
164
self._data._parse_record_unchecked(annotated_compressed_bytes)
242
165
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
244
167
contents = self._plain_factory.lower_line_delta(delta)
245
size, chunks = self._data._record_to_data((rec[1],), rec[3], contents)
246
return b''.join(chunks)
168
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
249
172
class FTAnnotatedToFullText(KnitAdapter):
250
173
"""An adapter from FT annotated knits to unannotated ones."""
252
def get_bytes(self, factory, target_storage_kind):
175
def get_bytes(self, factory):
253
176
annotated_compressed_bytes = factory._raw_record
254
177
rec, contents = \
255
178
self._data._parse_record_unchecked(annotated_compressed_bytes)
256
179
content, delta = self._annotate_factory.parse_record(factory.key[-1],
257
contents, factory._build_details, None)
258
if target_storage_kind == 'fulltext':
259
return b''.join(content.text())
260
elif target_storage_kind in ('chunked', 'lines'):
261
return content.text()
262
raise errors.UnavailableRepresentation(
263
factory.key, target_storage_kind, factory.storage_kind)
180
contents, factory._build_details, None)
181
return ''.join(content.text())
266
184
class DeltaAnnotatedToFullText(KnitAdapter):
267
185
"""An adapter for deltas from annotated to unannotated."""
269
def get_bytes(self, factory, target_storage_kind):
187
def get_bytes(self, factory):
270
188
annotated_compressed_bytes = factory._raw_record
271
189
rec, contents = \
272
190
self._data._parse_record_unchecked(annotated_compressed_bytes)
273
191
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
275
193
compression_parent = factory.parents[0]
276
basis_entry = next(self._basis_vf.get_record_stream(
277
[compression_parent], 'unordered', True))
194
basis_entry = self._basis_vf.get_record_stream(
195
[compression_parent], 'unordered', True).next()
278
196
if basis_entry.storage_kind == 'absent':
279
197
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
280
basis_lines = basis_entry.get_bytes_as('lines')
198
basis_chunks = basis_entry.get_bytes_as('chunked')
199
basis_lines = osutils.chunks_to_lines(basis_chunks)
281
200
# Manually apply the delta because we have one annotated content and
283
202
basis_content = PlainKnitContent(basis_lines, compression_parent)
284
203
basis_content.apply_delta(delta, rec[1])
285
204
basis_content._should_strip_eol = factory._build_details[1]
287
if target_storage_kind == 'fulltext':
288
return b''.join(basis_content.text())
289
elif target_storage_kind in ('chunked', 'lines'):
290
return basis_content.text()
291
raise errors.UnavailableRepresentation(
292
factory.key, target_storage_kind, factory.storage_kind)
205
return ''.join(basis_content.text())
295
208
class FTPlainToFullText(KnitAdapter):
296
209
"""An adapter from FT plain knits to unannotated ones."""
298
def get_bytes(self, factory, target_storage_kind):
211
def get_bytes(self, factory):
299
212
compressed_bytes = factory._raw_record
300
213
rec, contents = \
301
214
self._data._parse_record_unchecked(compressed_bytes)
302
215
content, delta = self._plain_factory.parse_record(factory.key[-1],
303
contents, factory._build_details, None)
304
if target_storage_kind == 'fulltext':
305
return b''.join(content.text())
306
elif target_storage_kind in ('chunked', 'lines'):
307
return content.text()
308
raise errors.UnavailableRepresentation(
309
factory.key, target_storage_kind, factory.storage_kind)
216
contents, factory._build_details, None)
217
return ''.join(content.text())
312
220
class DeltaPlainToFullText(KnitAdapter):
313
221
"""An adapter for deltas from annotated to unannotated."""
315
def get_bytes(self, factory, target_storage_kind):
223
def get_bytes(self, factory):
316
224
compressed_bytes = factory._raw_record
317
225
rec, contents = \
318
226
self._data._parse_record_unchecked(compressed_bytes)
319
227
delta = self._plain_factory.parse_line_delta(contents, rec[1])
320
228
compression_parent = factory.parents[0]
321
229
# XXX: string splitting overhead.
322
basis_entry = next(self._basis_vf.get_record_stream(
323
[compression_parent], 'unordered', True))
230
basis_entry = self._basis_vf.get_record_stream(
231
[compression_parent], 'unordered', True).next()
324
232
if basis_entry.storage_kind == 'absent':
325
233
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
326
basis_lines = basis_entry.get_bytes_as('lines')
234
basis_chunks = basis_entry.get_bytes_as('chunked')
235
basis_lines = osutils.chunks_to_lines(basis_chunks)
327
236
basis_content = PlainKnitContent(basis_lines, compression_parent)
328
237
# Manually apply the delta because we have one annotated content and
330
239
content, _ = self._plain_factory.parse_record(rec[1], contents,
331
factory._build_details, basis_content)
332
if target_storage_kind == 'fulltext':
333
return b''.join(content.text())
334
elif target_storage_kind in ('chunked', 'lines'):
335
return content.text()
336
raise errors.UnavailableRepresentation(
337
factory.key, target_storage_kind, factory.storage_kind)
240
factory._build_details, basis_content)
241
return ''.join(content.text())
340
244
class KnitContentFactory(ContentFactory):
398
300
if self._network_bytes is None:
399
301
self._create_network_bytes()
400
302
return self._network_bytes
401
if ('-ft-' in self.storage_kind
402
and storage_kind in ('chunked', 'fulltext', 'lines')):
403
adapter_key = (self.storage_kind, storage_kind)
303
if ('-ft-' in self.storage_kind and
304
storage_kind in ('chunked', 'fulltext')):
305
adapter_key = (self.storage_kind, 'fulltext')
404
306
adapter_factory = adapter_registry.get(adapter_key)
405
307
adapter = adapter_factory(None)
406
return adapter.get_bytes(self, storage_kind)
308
bytes = adapter.get_bytes(self)
309
if storage_kind == 'chunked':
407
313
if self._knit is not None:
408
314
# Not redundant with direct conversion above - that only handles
409
315
# fulltext cases.
410
if storage_kind in ('chunked', 'lines'):
316
if storage_kind == 'chunked':
411
317
return self._knit.get_lines(self.key[0])
412
318
elif storage_kind == 'fulltext':
413
319
return self._knit.get_text(self.key[0])
414
320
raise errors.UnavailableRepresentation(self.key, storage_kind,
417
def iter_bytes_as(self, storage_kind):
418
return iter(self.get_bytes_as(storage_kind))
421
324
class LazyKnitContentFactory(ContentFactory):
733
631
# loop to minimise any performance impact
735
633
for header in lines:
736
start, end, count = [int(n) for n in header.split(b',')]
737
contents = [next(lines).split(b' ', 1)[1]
738
for _ in range(count)]
634
start, end, count = [int(n) for n in header.split(',')]
635
contents = [next().split(' ', 1)[1] for i in xrange(count)]
739
636
result.append((start, end, count, contents))
741
638
for header in lines:
742
start, end, count = [int(n) for n in header.split(b',')]
743
contents = [tuple(next(lines).split(b' ', 1))
744
for _ in range(count)]
639
start, end, count = [int(n) for n in header.split(',')]
640
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
745
641
result.append((start, end, count, contents))
748
644
def get_fulltext_content(self, lines):
749
645
"""Extract just the content lines from a fulltext."""
750
return (line.split(b' ', 1)[1] for line in lines)
646
return (line.split(' ', 1)[1] for line in lines)
752
648
def get_linedelta_content(self, lines):
753
649
"""Extract just the content from a line delta.
1016
914
# indexes can't directly store that, so we give them
1017
915
# an empty tuple instead.
1019
line_bytes = b''.join(lines)
917
line_bytes = ''.join(lines)
1020
918
return self._add(key, lines, parents,
1021
parent_texts, left_matching_blocks, nostore_sha, random_id,
1022
line_bytes=line_bytes)
919
parent_texts, left_matching_blocks, nostore_sha, random_id,
920
line_bytes=line_bytes)
1024
def add_content(self, content_factory, parent_texts=None,
1025
left_matching_blocks=None, nostore_sha=None,
1027
"""See VersionedFiles.add_content()."""
922
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
923
"""See VersionedFiles._add_text()."""
1028
924
self._index._check_write_ok()
1029
key = content_factory.key
1030
parents = content_factory.parents
1031
925
self._check_add(key, None, random_id, check_content=False)
926
if text.__class__ is not str:
927
raise errors.BzrBadParameterUnicode("text")
1032
928
if parents is None:
1033
929
# The caller might pass None if there is no graph data, but kndx
1034
930
# indexes can't directly store that, so we give them
1035
931
# an empty tuple instead.
1037
lines = content_factory.get_bytes_as('lines')
1038
line_bytes = content_factory.get_bytes_as('fulltext')
1039
return self._add(key, lines, parents,
1040
parent_texts, left_matching_blocks, nostore_sha, random_id,
1041
line_bytes=line_bytes)
933
return self._add(key, None, parents,
934
None, None, nostore_sha, random_id,
1043
937
def _add(self, key, lines, parents, parent_texts,
1044
left_matching_blocks, nostore_sha, random_id,
938
left_matching_blocks, nostore_sha, random_id,
1046
940
"""Add a set of lines on top of version specified by parents.
1048
942
Any versions not present will be converted into ghosts.
1119
1013
if delta or (self._factory.annotated and len(present_parents) > 0):
1120
1014
# Merge annotations from parent texts if needed.
1121
1015
delta_hunks = self._merge_annotations(content, present_parents,
1122
parent_texts, delta, self._factory.annotated,
1123
left_matching_blocks)
1016
parent_texts, delta, self._factory.annotated,
1017
left_matching_blocks)
1126
options.append(b'line-delta')
1020
options.append('line-delta')
1127
1021
store_lines = self._factory.lower_line_delta(delta_hunks)
1128
size, data = self._record_to_data(key, digest, store_lines)
1022
size, bytes = self._record_to_data(key, digest,
1130
options.append(b'fulltext')
1025
options.append('fulltext')
1131
1026
# isinstance is slower and we have no hierarchy.
1132
1027
if self._factory.__class__ is KnitPlainFactory:
1133
1028
# Use the already joined bytes saving iteration time in
1134
1029
# _record_to_data.
1135
1030
dense_lines = [line_bytes]
1137
dense_lines.append(b'\n')
1138
size, data = self._record_to_data(key, digest,
1032
dense_lines.append('\n')
1033
size, bytes = self._record_to_data(key, digest,
1141
1036
# get mixed annotation + content and feed it into the
1143
1038
store_lines = self._factory.lower_fulltext(content)
1144
size, data = self._record_to_data(key, digest, store_lines)
1039
size, bytes = self._record_to_data(key, digest,
1146
access_memo = self._access.add_raw_record(key, size, data)
1042
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1147
1043
self._index.add_records(
1148
1044
((key, options, access_memo, parents),),
1149
1045
random_id=random_id)
1502
1396
keys = set(remaining_keys)
1503
1397
for content_factory in self._get_remaining_record_stream(keys,
1504
ordering, include_delta_closure):
1398
ordering, include_delta_closure):
1505
1399
remaining_keys.discard(content_factory.key)
1506
1400
yield content_factory
1508
except errors.RetryWithNewPacks as e:
1402
except errors.RetryWithNewPacks, e:
1509
1403
self._access.reload_or_raise(e)
1511
1405
def _get_remaining_record_stream(self, keys, ordering,
1512
1406
include_delta_closure):
1513
1407
"""This function is the 'retry' portion for get_record_stream."""
1514
1408
if include_delta_closure:
1515
positions = self._get_components_positions(
1516
keys, allow_missing=True)
1409
positions = self._get_components_positions(keys, allow_missing=True)
1518
1411
build_details = self._index.get_build_details(keys)
1519
1412
# map from key to
1520
1413
# (record_details, access_memo, compression_parent_key)
1521
1414
positions = dict((key, self._build_details_to_components(details))
1522
for key, details in build_details.items())
1415
for key, details in build_details.iteritems())
1523
1416
absent_keys = keys.difference(set(positions))
1524
1417
# There may be more absent keys : if we're missing the basis component
1525
1418
# and are trying to include the delta closure.
1994
1885
# 4168 calls in 2880 217 internal
1995
1886
# 4168 calls to _parse_record_header in 2121
1996
1887
# 4168 calls to readlines in 330
1997
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
1999
record_contents = df.readlines()
2000
except Exception as e:
2001
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
2002
(data, e.__class__.__name__, str(e)))
2003
header = record_contents.pop(0)
2004
rec = self._split_header(header)
2005
last_line = record_contents.pop()
2006
if len(record_contents) != int(rec[2]):
2007
raise KnitCorrupt(self,
2008
'incorrect number of lines %s != %s'
2009
' for version {%s} %s'
2010
% (len(record_contents), int(rec[2]),
2011
rec[1], record_contents))
2012
if last_line != b'end %s\n' % rec[1]:
2013
raise KnitCorrupt(self,
2014
'unexpected version end line %r, wanted %r'
2015
% (last_line, rec[1]))
1888
df = gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1890
record_contents = df.readlines()
1891
except Exception, e:
1892
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1893
(data, e.__class__.__name__, str(e)))
1894
header = record_contents.pop(0)
1895
rec = self._split_header(header)
1896
last_line = record_contents.pop()
1897
if len(record_contents) != int(rec[2]):
1898
raise KnitCorrupt(self,
1899
'incorrect number of lines %s != %s'
1900
' for version {%s} %s'
1901
% (len(record_contents), int(rec[2]),
1902
rec[1], record_contents))
1903
if last_line != 'end %s\n' % rec[1]:
1904
raise KnitCorrupt(self,
1905
'unexpected version end line %r, wanted %r'
1906
% (last_line, rec[1]))
2016
1908
return rec, record_contents
2018
1910
def _read_records_iter(self, records):
2274
2167
# one line with next ('' for None)
2275
2168
# one line with byte count of the record bytes
2276
2169
# the record bytes
2277
for key, (record_bytes, (method, noeol), next) in (
2278
self._raw_record_map.items()):
2279
key_bytes = b'\x00'.join(key)
2170
for key, (record_bytes, (method, noeol), next) in \
2171
self._raw_record_map.iteritems():
2172
key_bytes = '\x00'.join(key)
2280
2173
parents = self.global_map.get(key, None)
2281
2174
if parents is None:
2282
parent_bytes = b'None:'
2175
parent_bytes = 'None:'
2284
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2285
method_bytes = method.encode('ascii')
2177
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2178
method_bytes = method
2291
next_bytes = b'\x00'.join(next)
2184
next_bytes = '\x00'.join(next)
2294
map_byte_list.append(b'\n'.join(
2295
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2296
b'%d' % len(record_bytes), record_bytes]))
2297
map_bytes = b''.join(map_byte_list)
2187
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2188
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2189
len(record_bytes), record_bytes))
2190
map_bytes = ''.join(map_byte_list)
2298
2191
lines.append(map_bytes)
2299
bytes = b'\n'.join(lines)
2192
bytes = '\n'.join(lines)
2381
2274
end = len(bytes)
2382
2275
while start < end:
2383
2276
# 1 line with key
2384
line_end = bytes.find(b'\n', start)
2385
key = tuple(bytes[start:line_end].split(b'\x00'))
2277
line_end = bytes.find('\n', start)
2278
key = tuple(bytes[start:line_end].split('\x00'))
2386
2279
start = line_end + 1
2387
2280
# 1 line with parents (None: for None, '' for ())
2388
line_end = bytes.find(b'\n', start)
2281
line_end = bytes.find('\n', start)
2389
2282
line = bytes[start:line_end]
2390
if line == b'None:':
2393
2286
parents = tuple(
2394
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2287
[tuple(segment.split('\x00')) for segment in line.split('\t')
2396
2289
self.global_map[key] = parents
2397
2290
start = line_end + 1
2398
2291
# one line with method
2399
line_end = bytes.find(b'\n', start)
2292
line_end = bytes.find('\n', start)
2400
2293
line = bytes[start:line_end]
2401
method = line.decode('ascii')
2402
2295
start = line_end + 1
2403
2296
# one line with noeol
2404
line_end = bytes.find(b'\n', start)
2297
line_end = bytes.find('\n', start)
2405
2298
line = bytes[start:line_end]
2406
noeol = line == b"T"
2407
2300
start = line_end + 1
2408
# one line with next (b'' for None)
2409
line_end = bytes.find(b'\n', start)
2301
# one line with next ('' for None)
2302
line_end = bytes.find('\n', start)
2410
2303
line = bytes[start:line_end]
2414
next = tuple(bytes[start:line_end].split(b'\x00'))
2307
next = tuple(bytes[start:line_end].split('\x00'))
2415
2308
start = line_end + 1
2416
2309
# one line with byte count of the record bytes
2417
line_end = bytes.find(b'\n', start)
2310
line_end = bytes.find('\n', start)
2418
2311
line = bytes[start:line_end]
2419
2312
count = int(line)
2420
2313
start = line_end + 1
2421
2314
# the record bytes
2422
record_bytes = bytes[start:start + count]
2315
record_bytes = bytes[start:start+count]
2423
2316
start = start + count
2424
2317
# put it in the map
2425
2318
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
3454
3326
records, ann_keys = self._get_build_graph(key)
3455
3327
for idx, (sub_key, text, num_lines) in enumerate(
3456
self._extract_texts(records)):
3328
self._extract_texts(records)):
3457
3329
if pb is not None:
3458
3330
pb.update(gettext('annotating'), idx, len(records))
3459
3331
yield sub_key, text, num_lines
3460
3332
for sub_key in ann_keys:
3461
3333
text = self._text_cache[sub_key]
3462
num_lines = len(text) # bad assumption
3334
num_lines = len(text) # bad assumption
3463
3335
yield sub_key, text, num_lines
3465
except errors.RetryWithNewPacks as e:
3337
except errors.RetryWithNewPacks, e:
3466
3338
self._vf._access.reload_or_raise(e)
3467
3339
# The cached build_details are no longer valid
3468
3340
self._all_build_details.clear()
3470
3342
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3471
3343
parent_lines = self._text_cache[compression_parent]
3472
blocks = list(KnitContent.get_line_delta_blocks(
3473
delta, parent_lines, lines))
3344
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3474
3345
self._matching_blocks[(key, compression_parent)] = blocks
3476
3347
def _expand_record(self, key, parent_keys, compression_parent, record,