127
128
DATA_SUFFIX = '.knit'
128
129
INDEX_SUFFIX = '.kndx'
129
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
130
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
133
class KnitError(InternalBzrError):
138
class KnitCorrupt(KnitError):
140
_fmt = "Knit %(filename)s corrupt: %(how)s"
142
def __init__(self, filename, how):
143
KnitError.__init__(self)
144
self.filename = filename
148
class SHA1KnitCorrupt(KnitCorrupt):
150
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
151
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
154
def __init__(self, filename, actual, expected, key, content):
155
KnitError.__init__(self)
156
self.filename = filename
158
self.expected = expected
160
self.content = content
163
class KnitDataStreamIncompatible(KnitError):
164
# Not raised anymore, as we can convert data streams. In future we may
165
# need it again for more exotic cases, so we're keeping it around for now.
167
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
169
def __init__(self, stream_format, target_format):
170
self.stream_format = stream_format
171
self.target_format = target_format
174
class KnitDataStreamUnknown(KnitError):
175
# Indicates a data stream we don't know how to handle.
177
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
179
def __init__(self, stream_format):
180
self.stream_format = stream_format
183
class KnitHeaderError(KnitError):
185
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
187
def __init__(self, badline, filename):
188
KnitError.__init__(self)
189
self.badline = badline
190
self.filename = filename
193
class KnitIndexUnknownMethod(KnitError):
194
"""Raised when we don't understand the storage method.
196
Currently only 'fulltext' and 'line-delta' are supported.
199
_fmt = ("Knit index %(filename)s does not have a known method"
200
" in options: %(options)r")
202
def __init__(self, filename, options):
203
KnitError.__init__(self)
204
self.filename = filename
205
self.options = options
132
208
class KnitAdapter(object):
147
223
class FTAnnotatedToUnannotated(KnitAdapter):
148
224
"""An adapter from FT annotated knits to unannotated ones."""
150
def get_bytes(self, factory):
226
def get_bytes(self, factory, target_storage_kind):
227
if target_storage_kind != 'knit-ft-gz':
228
raise errors.UnavailableRepresentation(
229
factory.key, target_storage_kind, factory.storage_kind)
151
230
annotated_compressed_bytes = factory._raw_record
152
231
rec, contents = \
153
232
self._data._parse_record_unchecked(annotated_compressed_bytes)
154
233
content = self._annotate_factory.parse_fulltext(contents, rec[1])
155
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
234
size, chunks = self._data._record_to_data(
235
(rec[1],), rec[3], content.text())
236
return b''.join(chunks)
159
239
class DeltaAnnotatedToUnannotated(KnitAdapter):
160
240
"""An adapter for deltas from annotated to unannotated."""
162
def get_bytes(self, factory):
242
def get_bytes(self, factory, target_storage_kind):
243
if target_storage_kind != 'knit-delta-gz':
244
raise errors.UnavailableRepresentation(
245
factory.key, target_storage_kind, factory.storage_kind)
163
246
annotated_compressed_bytes = factory._raw_record
164
247
rec, contents = \
165
248
self._data._parse_record_unchecked(annotated_compressed_bytes)
166
249
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
168
251
contents = self._plain_factory.lower_line_delta(delta)
169
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
252
size, chunks = self._data._record_to_data((rec[1],), rec[3], contents)
253
return b''.join(chunks)
173
256
class FTAnnotatedToFullText(KnitAdapter):
174
257
"""An adapter from FT annotated knits to unannotated ones."""
176
def get_bytes(self, factory):
259
def get_bytes(self, factory, target_storage_kind):
177
260
annotated_compressed_bytes = factory._raw_record
178
261
rec, contents = \
179
262
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
263
content, delta = self._annotate_factory.parse_record(factory.key[-1],
181
contents, factory._build_details, None)
182
return ''.join(content.text())
264
contents, factory._build_details, None)
265
if target_storage_kind == 'fulltext':
266
return b''.join(content.text())
267
elif target_storage_kind in ('chunked', 'lines'):
268
return content.text()
269
raise errors.UnavailableRepresentation(
270
factory.key, target_storage_kind, factory.storage_kind)
185
273
class DeltaAnnotatedToFullText(KnitAdapter):
186
274
"""An adapter for deltas from annotated to unannotated."""
188
def get_bytes(self, factory):
276
def get_bytes(self, factory, target_storage_kind):
189
277
annotated_compressed_bytes = factory._raw_record
190
278
rec, contents = \
191
279
self._data._parse_record_unchecked(annotated_compressed_bytes)
192
280
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
194
282
compression_parent = factory.parents[0]
195
basis_entry = self._basis_vf.get_record_stream(
196
[compression_parent], 'unordered', True).next()
283
basis_entry = next(self._basis_vf.get_record_stream(
284
[compression_parent], 'unordered', True))
197
285
if basis_entry.storage_kind == 'absent':
198
286
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
199
basis_chunks = basis_entry.get_bytes_as('chunked')
200
basis_lines = osutils.chunks_to_lines(basis_chunks)
287
basis_lines = basis_entry.get_bytes_as('lines')
201
288
# Manually apply the delta because we have one annotated content and
203
290
basis_content = PlainKnitContent(basis_lines, compression_parent)
204
291
basis_content.apply_delta(delta, rec[1])
205
292
basis_content._should_strip_eol = factory._build_details[1]
206
return ''.join(basis_content.text())
294
if target_storage_kind == 'fulltext':
295
return b''.join(basis_content.text())
296
elif target_storage_kind in ('chunked', 'lines'):
297
return basis_content.text()
298
raise errors.UnavailableRepresentation(
299
factory.key, target_storage_kind, factory.storage_kind)
209
302
class FTPlainToFullText(KnitAdapter):
210
303
"""An adapter from FT plain knits to unannotated ones."""
212
def get_bytes(self, factory):
305
def get_bytes(self, factory, target_storage_kind):
213
306
compressed_bytes = factory._raw_record
214
307
rec, contents = \
215
308
self._data._parse_record_unchecked(compressed_bytes)
216
309
content, delta = self._plain_factory.parse_record(factory.key[-1],
217
contents, factory._build_details, None)
218
return ''.join(content.text())
310
contents, factory._build_details, None)
311
if target_storage_kind == 'fulltext':
312
return b''.join(content.text())
313
elif target_storage_kind in ('chunked', 'lines'):
314
return content.text()
315
raise errors.UnavailableRepresentation(
316
factory.key, target_storage_kind, factory.storage_kind)
221
319
class DeltaPlainToFullText(KnitAdapter):
222
320
"""An adapter for deltas from annotated to unannotated."""
224
def get_bytes(self, factory):
322
def get_bytes(self, factory, target_storage_kind):
225
323
compressed_bytes = factory._raw_record
226
324
rec, contents = \
227
325
self._data._parse_record_unchecked(compressed_bytes)
228
326
delta = self._plain_factory.parse_line_delta(contents, rec[1])
229
327
compression_parent = factory.parents[0]
230
328
# XXX: string splitting overhead.
231
basis_entry = self._basis_vf.get_record_stream(
232
[compression_parent], 'unordered', True).next()
329
basis_entry = next(self._basis_vf.get_record_stream(
330
[compression_parent], 'unordered', True))
233
331
if basis_entry.storage_kind == 'absent':
234
332
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
235
basis_chunks = basis_entry.get_bytes_as('chunked')
236
basis_lines = osutils.chunks_to_lines(basis_chunks)
333
basis_lines = basis_entry.get_bytes_as('lines')
237
334
basis_content = PlainKnitContent(basis_lines, compression_parent)
238
335
# Manually apply the delta because we have one annotated content and
240
337
content, _ = self._plain_factory.parse_record(rec[1], contents,
241
factory._build_details, basis_content)
242
return ''.join(content.text())
338
factory._build_details, basis_content)
339
if target_storage_kind == 'fulltext':
340
return b''.join(content.text())
341
elif target_storage_kind in ('chunked', 'lines'):
342
return content.text()
343
raise errors.UnavailableRepresentation(
344
factory.key, target_storage_kind, factory.storage_kind)
245
347
class KnitContentFactory(ContentFactory):
301
405
if self._network_bytes is None:
302
406
self._create_network_bytes()
303
407
return self._network_bytes
304
if ('-ft-' in self.storage_kind and
305
storage_kind in ('chunked', 'fulltext')):
306
adapter_key = (self.storage_kind, 'fulltext')
408
if ('-ft-' in self.storage_kind
409
and storage_kind in ('chunked', 'fulltext', 'lines')):
410
adapter_key = (self.storage_kind, storage_kind)
307
411
adapter_factory = adapter_registry.get(adapter_key)
308
412
adapter = adapter_factory(None)
309
bytes = adapter.get_bytes(self)
310
if storage_kind == 'chunked':
413
return adapter.get_bytes(self, storage_kind)
314
414
if self._knit is not None:
315
415
# Not redundant with direct conversion above - that only handles
316
416
# fulltext cases.
317
if storage_kind == 'chunked':
417
if storage_kind in ('chunked', 'lines'):
318
418
return self._knit.get_lines(self.key[0])
319
419
elif storage_kind == 'fulltext':
320
420
return self._knit.get_text(self.key[0])
321
421
raise errors.UnavailableRepresentation(self.key, storage_kind,
424
def iter_bytes_as(self, storage_kind):
425
return iter(self.get_bytes_as(storage_kind))
325
428
class LazyKnitContentFactory(ContentFactory):
632
740
# loop to minimise any performance impact
634
742
for header in lines:
635
start, end, count = [int(n) for n in header.split(',')]
636
contents = [next().split(' ', 1)[1] for i in xrange(count)]
743
start, end, count = [int(n) for n in header.split(b',')]
744
contents = [next(lines).split(b' ', 1)[1]
745
for _ in range(count)]
637
746
result.append((start, end, count, contents))
639
748
for header in lines:
640
start, end, count = [int(n) for n in header.split(',')]
641
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
749
start, end, count = [int(n) for n in header.split(b',')]
750
contents = [tuple(next(lines).split(b' ', 1))
751
for _ in range(count)]
642
752
result.append((start, end, count, contents))
645
755
def get_fulltext_content(self, lines):
646
756
"""Extract just the content lines from a fulltext."""
647
return (line.split(' ', 1)[1] for line in lines)
757
return (line.split(b' ', 1)[1] for line in lines)
649
759
def get_linedelta_content(self, lines):
650
760
"""Extract just the content from a line delta.
909
1023
# indexes can't directly store that, so we give them
910
1024
# an empty tuple instead.
912
line_bytes = ''.join(lines)
1026
line_bytes = b''.join(lines)
913
1027
return self._add(key, lines, parents,
914
parent_texts, left_matching_blocks, nostore_sha, random_id,
915
line_bytes=line_bytes)
1028
parent_texts, left_matching_blocks, nostore_sha, random_id,
1029
line_bytes=line_bytes)
917
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
918
"""See VersionedFiles._add_text()."""
1031
def add_content(self, content_factory, parent_texts=None,
1032
left_matching_blocks=None, nostore_sha=None,
1034
"""See VersionedFiles.add_content()."""
919
1035
self._index._check_write_ok()
1036
key = content_factory.key
1037
parents = content_factory.parents
920
1038
self._check_add(key, None, random_id, check_content=False)
921
if text.__class__ is not str:
922
raise errors.BzrBadParameterUnicode("text")
923
1039
if parents is None:
924
1040
# The caller might pass None if there is no graph data, but kndx
925
1041
# indexes can't directly store that, so we give them
926
1042
# an empty tuple instead.
928
return self._add(key, None, parents,
929
None, None, nostore_sha, random_id,
1044
lines = content_factory.get_bytes_as('lines')
1045
line_bytes = content_factory.get_bytes_as('fulltext')
1046
return self._add(key, lines, parents,
1047
parent_texts, left_matching_blocks, nostore_sha, random_id,
1048
line_bytes=line_bytes)
932
1050
def _add(self, key, lines, parents, parent_texts,
933
left_matching_blocks, nostore_sha, random_id,
1051
left_matching_blocks, nostore_sha, random_id,
935
1053
"""Add a set of lines on top of version specified by parents.
937
1055
Any versions not present will be converted into ghosts.
1008
1126
if delta or (self._factory.annotated and len(present_parents) > 0):
1009
1127
# Merge annotations from parent texts if needed.
1010
1128
delta_hunks = self._merge_annotations(content, present_parents,
1011
parent_texts, delta, self._factory.annotated,
1012
left_matching_blocks)
1129
parent_texts, delta, self._factory.annotated,
1130
left_matching_blocks)
1015
options.append('line-delta')
1133
options.append(b'line-delta')
1016
1134
store_lines = self._factory.lower_line_delta(delta_hunks)
1017
size, bytes = self._record_to_data(key, digest,
1135
size, data = self._record_to_data(key, digest, store_lines)
1020
options.append('fulltext')
1137
options.append(b'fulltext')
1021
1138
# isinstance is slower and we have no hierarchy.
1022
1139
if self._factory.__class__ is KnitPlainFactory:
1023
1140
# Use the already joined bytes saving iteration time in
1024
1141
# _record_to_data.
1025
1142
dense_lines = [line_bytes]
1027
dense_lines.append('\n')
1028
size, bytes = self._record_to_data(key, digest,
1144
dense_lines.append(b'\n')
1145
size, data = self._record_to_data(key, digest,
1031
1148
# get mixed annotation + content and feed it into the
1033
1150
store_lines = self._factory.lower_fulltext(content)
1034
size, bytes = self._record_to_data(key, digest,
1151
size, data = self._record_to_data(key, digest, store_lines)
1037
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1153
access_memo = self._access.add_raw_record(key, size, data)
1038
1154
self._index.add_records(
1039
1155
((key, options, access_memo, parents),),
1040
1156
random_id=random_id)
1405
1509
keys = set(remaining_keys)
1406
1510
for content_factory in self._get_remaining_record_stream(keys,
1407
ordering, include_delta_closure):
1511
ordering, include_delta_closure):
1408
1512
remaining_keys.discard(content_factory.key)
1409
1513
yield content_factory
1411
except errors.RetryWithNewPacks, e:
1515
except errors.RetryWithNewPacks as e:
1412
1516
self._access.reload_or_raise(e)
1414
1518
def _get_remaining_record_stream(self, keys, ordering,
1415
1519
include_delta_closure):
1416
1520
"""This function is the 'retry' portion for get_record_stream."""
1417
1521
if include_delta_closure:
1418
positions = self._get_components_positions(keys, allow_missing=True)
1522
positions = self._get_components_positions(
1523
keys, allow_missing=True)
1420
1525
build_details = self._index.get_build_details(keys)
1421
1526
# map from key to
1422
1527
# (record_details, access_memo, compression_parent_key)
1423
1528
positions = dict((key, self._build_details_to_components(details))
1424
for key, details in build_details.iteritems())
1529
for key, details in viewitems(build_details))
1425
1530
absent_keys = keys.difference(set(positions))
1426
1531
# There may be more absent keys : if we're missing the basis component
1427
1532
# and are trying to include the delta closure.
1894
2001
# 4168 calls in 2880 217 internal
1895
2002
# 4168 calls to _parse_record_header in 2121
1896
2003
# 4168 calls to readlines in 330
1897
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1899
record_contents = df.readlines()
1900
except Exception, e:
1901
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1902
(data, e.__class__.__name__, str(e)))
1903
header = record_contents.pop(0)
1904
rec = self._split_header(header)
1905
last_line = record_contents.pop()
1906
if len(record_contents) != int(rec[2]):
1907
raise KnitCorrupt(self,
1908
'incorrect number of lines %s != %s'
1909
' for version {%s} %s'
1910
% (len(record_contents), int(rec[2]),
1911
rec[1], record_contents))
1912
if last_line != 'end %s\n' % rec[1]:
1913
raise KnitCorrupt(self,
1914
'unexpected version end line %r, wanted %r'
1915
% (last_line, rec[1]))
2004
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
2006
record_contents = df.readlines()
2007
except Exception as e:
2008
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
2009
(data, e.__class__.__name__, str(e)))
2010
header = record_contents.pop(0)
2011
rec = self._split_header(header)
2012
last_line = record_contents.pop()
2013
if len(record_contents) != int(rec[2]):
2014
raise KnitCorrupt(self,
2015
'incorrect number of lines %s != %s'
2016
' for version {%s} %s'
2017
% (len(record_contents), int(rec[2]),
2018
rec[1], record_contents))
2019
if last_line != b'end %s\n' % rec[1]:
2020
raise KnitCorrupt(self,
2021
'unexpected version end line %r, wanted %r'
2022
% (last_line, rec[1]))
1917
2023
return rec, record_contents
1919
2025
def _read_records_iter(self, records):
1985
2091
:param key: The key of the record. Currently keys are always serialised
1986
2092
using just the trailing component.
1987
2093
:param dense_lines: The bytes of lines but in a denser form. For
1988
instance, if lines is a list of 1000 bytestrings each ending in \n,
1989
dense_lines may be a list with one line in it, containing all the
1990
1000's lines and their \n's. Using dense_lines if it is already
1991
known is a win because the string join to create bytes in this
1992
function spends less time resizing the final string.
1993
:return: (len, a StringIO instance with the raw data ready to read.)
2094
instance, if lines is a list of 1000 bytestrings each ending in
2095
\\n, dense_lines may be a list with one line in it, containing all
2096
the 1000's lines and their \\n's. Using dense_lines if it is
2097
already known is a win because the string join to create bytes in
2098
this function spends less time resizing the final string.
2099
:return: (len, chunked bytestring with compressed data)
1995
chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
2101
chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
1996
2102
chunks.extend(dense_lines or lines)
1997
chunks.append("end %s\n" % key[-1])
2103
chunks.append(b"end " + key[-1] + b"\n")
1998
2104
for chunk in chunks:
1999
if type(chunk) is not str:
2105
if not isinstance(chunk, bytes):
2000
2106
raise AssertionError(
2001
2107
'data must be plain bytes was %s' % type(chunk))
2002
if lines and lines[-1][-1] != '\n':
2108
if lines and not lines[-1].endswith(b'\n'):
2003
2109
raise ValueError('corrupt lines value %r' % lines)
2004
compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
2005
return len(compressed_bytes), compressed_bytes
2110
compressed_chunks = tuned_gzip.chunks_to_gzip(chunks)
2111
return sum(map(len, compressed_chunks)), compressed_chunks
2007
2113
def _split_header(self, line):
2008
2114
rec = line.split()
2175
2281
# one line with next ('' for None)
2176
2282
# one line with byte count of the record bytes
2177
2283
# the record bytes
2178
for key, (record_bytes, (method, noeol), next) in \
2179
self._raw_record_map.iteritems():
2180
key_bytes = '\x00'.join(key)
2284
for key, (record_bytes, (method, noeol), next) in viewitems(
2285
self._raw_record_map):
2286
key_bytes = b'\x00'.join(key)
2181
2287
parents = self.global_map.get(key, None)
2182
2288
if parents is None:
2183
parent_bytes = 'None:'
2289
parent_bytes = b'None:'
2185
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2186
method_bytes = method
2291
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2292
method_bytes = method.encode('ascii')
2192
next_bytes = '\x00'.join(next)
2298
next_bytes = b'\x00'.join(next)
2195
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2196
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2197
len(record_bytes), record_bytes))
2198
map_bytes = ''.join(map_byte_list)
2301
map_byte_list.append(b'\n'.join(
2302
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2303
b'%d' % len(record_bytes), record_bytes]))
2304
map_bytes = b''.join(map_byte_list)
2199
2305
lines.append(map_bytes)
2200
bytes = '\n'.join(lines)
2306
bytes = b'\n'.join(lines)
2282
2388
end = len(bytes)
2283
2389
while start < end:
2284
2390
# 1 line with key
2285
line_end = bytes.find('\n', start)
2286
key = tuple(bytes[start:line_end].split('\x00'))
2391
line_end = bytes.find(b'\n', start)
2392
key = tuple(bytes[start:line_end].split(b'\x00'))
2287
2393
start = line_end + 1
2288
2394
# 1 line with parents (None: for None, '' for ())
2289
line_end = bytes.find('\n', start)
2395
line_end = bytes.find(b'\n', start)
2290
2396
line = bytes[start:line_end]
2397
if line == b'None:':
2294
2400
parents = tuple(
2295
[tuple(segment.split('\x00')) for segment in line.split('\t')
2401
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2297
2403
self.global_map[key] = parents
2298
2404
start = line_end + 1
2299
2405
# one line with method
2300
line_end = bytes.find('\n', start)
2406
line_end = bytes.find(b'\n', start)
2301
2407
line = bytes[start:line_end]
2408
method = line.decode('ascii')
2303
2409
start = line_end + 1
2304
2410
# one line with noeol
2305
line_end = bytes.find('\n', start)
2411
line_end = bytes.find(b'\n', start)
2306
2412
line = bytes[start:line_end]
2413
noeol = line == b"T"
2308
2414
start = line_end + 1
2309
# one line with next ('' for None)
2310
line_end = bytes.find('\n', start)
2415
# one line with next (b'' for None)
2416
line_end = bytes.find(b'\n', start)
2311
2417
line = bytes[start:line_end]
2315
next = tuple(bytes[start:line_end].split('\x00'))
2421
next = tuple(bytes[start:line_end].split(b'\x00'))
2316
2422
start = line_end + 1
2317
2423
# one line with byte count of the record bytes
2318
line_end = bytes.find('\n', start)
2424
line_end = bytes.find(b'\n', start)
2319
2425
line = bytes[start:line_end]
2320
2426
count = int(line)
2321
2427
start = line_end + 1
2322
2428
# the record bytes
2323
record_bytes = bytes[start:start+count]
2429
record_bytes = bytes[start:start + count]
2324
2430
start = start + count
2325
2431
# put it in the map
2326
2432
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2784
2892
def _split_key(self, key):
2785
2893
"""Split key into a prefix and suffix."""
2894
# GZ 2018-07-03: This is intentionally either a sequence or bytes?
2895
if isinstance(key, bytes):
2896
return key[:-1], key[-1:]
2786
2897
return key[:-1], key[-1]
2789
class _KeyRefs(object):
2791
def __init__(self, track_new_keys=False):
2792
# dict mapping 'key' to 'set of keys referring to that key'
2795
# set remembering all new keys
2796
self.new_keys = set()
2798
self.new_keys = None
2804
self.new_keys.clear()
2806
def add_references(self, key, refs):
2807
# Record the new references
2808
for referenced in refs:
2810
needed_by = self.refs[referenced]
2812
needed_by = self.refs[referenced] = set()
2814
# Discard references satisfied by the new key
2817
def get_new_keys(self):
2818
return self.new_keys
2820
def get_unsatisfied_refs(self):
2821
return self.refs.iterkeys()
2823
def _satisfy_refs_for_key(self, key):
2827
# No keys depended on this key. That's ok.
2830
def add_key(self, key):
2831
# satisfy refs for key, and remember that we've seen this key.
2832
self._satisfy_refs_for_key(key)
2833
if self.new_keys is not None:
2834
self.new_keys.add(key)
2836
def satisfy_refs_for_keys(self, keys):
2838
self._satisfy_refs_for_key(key)
2840
def get_referrers(self):
2842
for referrers in self.refs.itervalues():
2843
result.update(referrers)
2847
2900
class _KnitGraphIndex(object):
2848
2901
"""A KnitVersionedFiles index layered on GraphIndex."""
2850
2903
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2851
add_callback=None, track_external_parent_refs=False):
2904
add_callback=None, track_external_parent_refs=False):
2852
2905
"""Construct a KnitGraphIndex on a graph_index.
2854
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2907
:param graph_index: An implementation of breezy.index.GraphIndex.
2855
2908
:param is_locked: A callback to check whether the object should answer
2857
2910
:param deltas: Allow delta-compressed records.
3281
class _DirectPackAccess(object):
3282
"""Access to data in one or more packs with less translation."""
3284
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3285
"""Create a _DirectPackAccess object.
3287
:param index_to_packs: A dict mapping index objects to the transport
3288
and file names for obtaining data.
3289
:param reload_func: A function to call if we determine that the pack
3290
files have moved and we need to reload our caches. See
3291
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3293
self._container_writer = None
3294
self._write_index = None
3295
self._indices = index_to_packs
3296
self._reload_func = reload_func
3297
self._flush_func = flush_func
3299
def add_raw_records(self, key_sizes, raw_data):
3300
"""Add raw knit bytes to a storage area.
3302
The data is spooled to the container writer in one bytes-record per
3305
:param sizes: An iterable of tuples containing the key and size of each
3307
:param raw_data: A bytestring containing the data.
3308
:return: A list of memos to retrieve the record later. Each memo is an
3309
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3310
length), where the index field is the write_index object supplied
3311
to the PackAccess object.
3313
if type(raw_data) is not str:
3314
raise AssertionError(
3315
'data must be plain bytes was %s' % type(raw_data))
3318
for key, size in key_sizes:
3319
p_offset, p_length = self._container_writer.add_bytes_record(
3320
raw_data[offset:offset+size], [])
3322
result.append((self._write_index, p_offset, p_length))
3326
"""Flush pending writes on this access object.
3328
This will flush any buffered writes to a NewPack.
3330
if self._flush_func is not None:
3333
def get_raw_records(self, memos_for_retrieval):
3334
"""Get the raw bytes for a records.
3336
:param memos_for_retrieval: An iterable containing the (index, pos,
3337
length) memo for retrieving the bytes. The Pack access method
3338
looks up the pack to use for a given record in its index_to_pack
3340
:return: An iterator over the bytes of the records.
3342
# first pass, group into same-index requests
3344
current_index = None
3345
for (index, offset, length) in memos_for_retrieval:
3346
if current_index == index:
3347
current_list.append((offset, length))
3349
if current_index is not None:
3350
request_lists.append((current_index, current_list))
3351
current_index = index
3352
current_list = [(offset, length)]
3353
# handle the last entry
3354
if current_index is not None:
3355
request_lists.append((current_index, current_list))
3356
for index, offsets in request_lists:
3358
transport, path = self._indices[index]
3360
# A KeyError here indicates that someone has triggered an index
3361
# reload, and this index has gone missing, we need to start
3363
if self._reload_func is None:
3364
# If we don't have a _reload_func there is nothing that can
3367
raise errors.RetryWithNewPacks(index,
3368
reload_occurred=True,
3369
exc_info=sys.exc_info())
3371
reader = pack.make_readv_reader(transport, path, offsets)
3372
for names, read_func in reader.iter_records():
3373
yield read_func(None)
3374
except errors.NoSuchFile:
3375
# A NoSuchFile error indicates that a pack file has gone
3376
# missing on disk, we need to trigger a reload, and start over.
3377
if self._reload_func is None:
3379
raise errors.RetryWithNewPacks(transport.abspath(path),
3380
reload_occurred=False,
3381
exc_info=sys.exc_info())
3383
def set_writer(self, writer, index, transport_packname):
3384
"""Set a writer to use for adding data."""
3385
if index is not None:
3386
self._indices[index] = transport_packname
3387
self._container_writer = writer
3388
self._write_index = index
3390
def reload_or_raise(self, retry_exc):
3391
"""Try calling the reload function, or re-raise the original exception.
3393
This should be called after _DirectPackAccess raises a
3394
RetryWithNewPacks exception. This function will handle the common logic
3395
of determining when the error is fatal versus being temporary.
3396
It will also make sure that the original exception is raised, rather
3397
than the RetryWithNewPacks exception.
3399
If this function returns, then the calling function should retry
3400
whatever operation was being performed. Otherwise an exception will
3403
:param retry_exc: A RetryWithNewPacks exception.
3406
if self._reload_func is None:
3408
elif not self._reload_func():
3409
# The reload claimed that nothing changed
3410
if not retry_exc.reload_occurred:
3411
# If there wasn't an earlier reload, then we really were
3412
# expecting to find changes. We didn't find them, so this is a
3416
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3417
raise exc_class, exc_value, exc_traceback
3420
# Deprecated, use PatienceSequenceMatcher instead
3421
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3424
3350
def annotate_knit(knit, revision_id):
3425
3351
"""Annotate a knit with no cached annotations.
3535
3461
records, ann_keys = self._get_build_graph(key)
3536
3462
for idx, (sub_key, text, num_lines) in enumerate(
3537
self._extract_texts(records)):
3463
self._extract_texts(records)):
3538
3464
if pb is not None:
3539
pb.update('annotating', idx, len(records))
3465
pb.update(gettext('annotating'), idx, len(records))
3540
3466
yield sub_key, text, num_lines
3541
3467
for sub_key in ann_keys:
3542
3468
text = self._text_cache[sub_key]
3543
num_lines = len(text) # bad assumption
3469
num_lines = len(text) # bad assumption
3544
3470
yield sub_key, text, num_lines
3546
except errors.RetryWithNewPacks, e:
3472
except errors.RetryWithNewPacks as e:
3547
3473
self._vf._access.reload_or_raise(e)
3548
3474
# The cached build_details are no longer valid
3549
3475
self._all_build_details.clear()
3551
3477
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3552
3478
parent_lines = self._text_cache[compression_parent]
3553
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3479
blocks = list(KnitContent.get_line_delta_blocks(
3480
delta, parent_lines, lines))
3554
3481
self._matching_blocks[(key, compression_parent)] = blocks
3556
3483
def _expand_record(self, key, parent_keys, compression_parent, record,