127
128
DATA_SUFFIX = '.knit'
128
129
INDEX_SUFFIX = '.kndx'
129
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
130
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
133
class KnitError(InternalBzrError):
138
class KnitCorrupt(KnitError):
140
_fmt = "Knit %(filename)s corrupt: %(how)s"
142
def __init__(self, filename, how):
143
KnitError.__init__(self)
144
self.filename = filename
148
class SHA1KnitCorrupt(KnitCorrupt):
150
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
151
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
154
def __init__(self, filename, actual, expected, key, content):
155
KnitError.__init__(self)
156
self.filename = filename
158
self.expected = expected
160
self.content = content
163
class KnitDataStreamIncompatible(KnitError):
164
# Not raised anymore, as we can convert data streams. In future we may
165
# need it again for more exotic cases, so we're keeping it around for now.
167
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
169
def __init__(self, stream_format, target_format):
170
self.stream_format = stream_format
171
self.target_format = target_format
174
class KnitDataStreamUnknown(KnitError):
175
# Indicates a data stream we don't know how to handle.
177
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
179
def __init__(self, stream_format):
180
self.stream_format = stream_format
183
class KnitHeaderError(KnitError):
185
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
187
def __init__(self, badline, filename):
188
KnitError.__init__(self)
189
self.badline = badline
190
self.filename = filename
193
class KnitIndexUnknownMethod(KnitError):
194
"""Raised when we don't understand the storage method.
196
Currently only 'fulltext' and 'line-delta' are supported.
199
_fmt = ("Knit index %(filename)s does not have a known method"
200
" in options: %(options)r")
202
def __init__(self, filename, options):
203
KnitError.__init__(self)
204
self.filename = filename
205
self.options = options
132
208
class KnitAdapter(object):
632
708
# loop to minimise any performance impact
634
710
for header in lines:
635
start, end, count = [int(n) for n in header.split(',')]
636
contents = [next().split(' ', 1)[1] for i in xrange(count)]
711
start, end, count = [int(n) for n in header.split(b',')]
712
contents = [next(lines).split(b' ', 1)[1]
713
for _ in range(count)]
637
714
result.append((start, end, count, contents))
639
716
for header in lines:
640
start, end, count = [int(n) for n in header.split(',')]
641
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
717
start, end, count = [int(n) for n in header.split(b',')]
718
contents = [tuple(next(lines).split(b' ', 1))
719
for _ in range(count)]
642
720
result.append((start, end, count, contents))
645
723
def get_fulltext_content(self, lines):
646
724
"""Extract just the content lines from a fulltext."""
647
return (line.split(' ', 1)[1] for line in lines)
725
return (line.split(b' ', 1)[1] for line in lines)
649
727
def get_linedelta_content(self, lines):
650
728
"""Extract just the content from a line delta.
800
876
max_delta_chain = 0
801
877
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
802
key_elements=keylength)
878
key_elements=keylength)
803
879
stream = transport.open_write_stream('newpack')
804
880
writer = pack.ContainerWriter(stream.write)
806
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
807
deltas=delta, add_callback=graph_index.add_nodes)
808
access = _DirectPackAccess({})
882
index = _KnitGraphIndex(graph_index, lambda: True, parents=parents,
883
deltas=delta, add_callback=graph_index.add_nodes)
884
access = pack_repo._DirectPackAccess({})
809
885
access.set_writer(writer, graph_index, (transport, 'newpack'))
810
886
result = KnitVersionedFiles(index, access,
811
max_delta_chain=max_delta_chain)
887
max_delta_chain=max_delta_chain)
812
888
result.stream = stream
813
889
result.writer = writer
970
def without_fallbacks(self):
971
"""Return a clone of this object without any fallbacks configured."""
972
return KnitVersionedFiles(self._index, self._access,
973
self._max_delta_chain, self._factory.annotated,
894
976
def add_fallback_versioned_files(self, a_versioned_files):
895
977
"""Add a source of texts for texts not present in this knit.
897
979
:param a_versioned_files: A VersionedFiles object.
899
self._fallback_vfs.append(a_versioned_files)
981
self._immediate_fallback_vfs.append(a_versioned_files)
901
983
def add_lines(self, key, parents, lines, parent_texts=None,
902
left_matching_blocks=None, nostore_sha=None, random_id=False,
984
left_matching_blocks=None, nostore_sha=None, random_id=False,
904
986
"""See VersionedFiles.add_lines()."""
905
987
self._index._check_write_ok()
906
988
self._check_add(key, lines, random_id, check_content)
909
991
# indexes can't directly store that, so we give them
910
992
# an empty tuple instead.
912
line_bytes = ''.join(lines)
994
line_bytes = b''.join(lines)
913
995
return self._add(key, lines, parents,
914
parent_texts, left_matching_blocks, nostore_sha, random_id,
915
line_bytes=line_bytes)
996
parent_texts, left_matching_blocks, nostore_sha, random_id,
997
line_bytes=line_bytes)
917
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
918
"""See VersionedFiles._add_text()."""
999
def add_chunks(self, key, parents, chunk_iter, parent_texts=None,
1000
left_matching_blocks=None, nostore_sha=None, random_id=False):
1001
"""See VersionedFiles.add_chunks()."""
919
1002
self._index._check_write_ok()
920
1003
self._check_add(key, None, random_id, check_content=False)
921
if text.__class__ is not str:
922
raise errors.BzrBadParameterUnicode("text")
923
1004
if parents is None:
924
1005
# The caller might pass None if there is no graph data, but kndx
925
1006
# indexes can't directly store that, so we give them
926
1007
# an empty tuple instead.
1009
line_bytes = b''.join(chunk_iter)
928
1010
return self._add(key, None, parents,
929
None, None, nostore_sha, random_id,
1011
parent_texts, left_matching_blocks, nostore_sha, random_id,
1012
line_bytes=line_bytes)
932
1014
def _add(self, key, lines, parents, parent_texts,
933
left_matching_blocks, nostore_sha, random_id,
1015
left_matching_blocks, nostore_sha, random_id,
935
1017
"""Add a set of lines on top of version specified by parents.
937
1019
Any versions not present will be converted into ghosts.
988
1070
lines = lines[:]
989
1071
# Replace the last line with one that ends in a final newline
990
lines[-1] = lines[-1] + '\n'
1072
lines[-1] = lines[-1] + b'\n'
991
1073
if lines is None:
992
1074
lines = osutils.split_lines(line_bytes)
994
1076
for element in key[:-1]:
995
if type(element) is not str:
996
raise TypeError("key contains non-strings: %r" % (key,))
1077
if not isinstance(element, bytes):
1078
raise TypeError("key contains non-bytestrings: %r" % (key,))
997
1079
if key[-1] is None:
998
key = key[:-1] + ('sha1:' + digest,)
999
elif type(key[-1]) is not str:
1000
raise TypeError("key contains non-strings: %r" % (key,))
1080
key = key[:-1] + (b'sha1:' + digest,)
1081
elif not isinstance(key[-1], bytes):
1082
raise TypeError("key contains non-bytestrings: %r" % (key,))
1001
1083
# Knit hunks are still last-element only
1002
1084
version_id = key[-1]
1003
1085
content = self._factory.make(lines, version_id)
1008
1090
if delta or (self._factory.annotated and len(present_parents) > 0):
1009
1091
# Merge annotations from parent texts if needed.
1010
1092
delta_hunks = self._merge_annotations(content, present_parents,
1011
parent_texts, delta, self._factory.annotated,
1012
left_matching_blocks)
1093
parent_texts, delta, self._factory.annotated,
1094
left_matching_blocks)
1015
options.append('line-delta')
1097
options.append(b'line-delta')
1016
1098
store_lines = self._factory.lower_line_delta(delta_hunks)
1017
size, bytes = self._record_to_data(key, digest,
1099
size, data = self._record_to_data(key, digest,
1020
options.append('fulltext')
1102
options.append(b'fulltext')
1021
1103
# isinstance is slower and we have no hierarchy.
1022
1104
if self._factory.__class__ is KnitPlainFactory:
1023
1105
# Use the already joined bytes saving iteration time in
1024
1106
# _record_to_data.
1025
1107
dense_lines = [line_bytes]
1027
dense_lines.append('\n')
1028
size, bytes = self._record_to_data(key, digest,
1109
dense_lines.append(b'\n')
1110
size, data = self._record_to_data(key, digest,
1031
1113
# get mixed annotation + content and feed it into the
1033
1115
store_lines = self._factory.lower_fulltext(content)
1034
size, bytes = self._record_to_data(key, digest,
1116
size, data = self._record_to_data(key, digest,
1037
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1119
access_memo = self._access.add_raw_records([(key, size)], data)[0]
1038
1120
self._index.add_records(
1039
1121
((key, options, access_memo, parents),),
1040
1122
random_id=random_id)
1241
1313
"""Produce a dictionary of knit records.
1243
1315
:return: {key:(record, record_details, digest, next)}
1245
data returned from read_records (a KnitContentobject)
1247
opaque information to pass to parse_record
1249
SHA1 digest of the full text after all steps are done
1251
build-parent of the version, i.e. the leftmost ancestor.
1317
* record: data returned from read_records (a KnitContentobject)
1318
* record_details: opaque information to pass to parse_record
1319
* digest: SHA1 digest of the full text after all steps are done
1320
* next: build-parent of the version, i.e. the leftmost ancestor.
1252
1321
Will be None if the record is not a delta.
1253
1323
:param keys: The keys to build a map for
1254
1324
:param allow_missing: If some records are missing, rather than
1255
1325
error, just return the data that could be generated.
1257
1327
raw_map = self._get_record_map_unparsed(keys,
1258
allow_missing=allow_missing)
1328
allow_missing=allow_missing)
1259
1329
return self._raw_map_to_record_map(raw_map)
1261
1331
def _raw_map_to_record_map(self, raw_map):
1405
1475
keys = set(remaining_keys)
1406
1476
for content_factory in self._get_remaining_record_stream(keys,
1407
ordering, include_delta_closure):
1477
ordering, include_delta_closure):
1408
1478
remaining_keys.discard(content_factory.key)
1409
1479
yield content_factory
1411
except errors.RetryWithNewPacks, e:
1481
except errors.RetryWithNewPacks as e:
1412
1482
self._access.reload_or_raise(e)
1414
1484
def _get_remaining_record_stream(self, keys, ordering,
1415
1485
include_delta_closure):
1416
1486
"""This function is the 'retry' portion for get_record_stream."""
1417
1487
if include_delta_closure:
1418
positions = self._get_components_positions(keys, allow_missing=True)
1488
positions = self._get_components_positions(
1489
keys, allow_missing=True)
1420
1491
build_details = self._index.get_build_details(keys)
1421
1492
# map from key to
1422
1493
# (record_details, access_memo, compression_parent_key)
1423
1494
positions = dict((key, self._build_details_to_components(details))
1424
for key, details in build_details.iteritems())
1495
for key, details in viewitems(build_details))
1425
1496
absent_keys = keys.difference(set(positions))
1426
1497
# There may be more absent keys : if we're missing the basis component
1427
1498
# and are trying to include the delta closure.
1767
1839
# we need key, position, length
1768
1840
key_records = []
1769
1841
build_details = self._index.get_build_details(keys)
1770
for key, details in build_details.iteritems():
1842
for key, details in viewitems(build_details):
1771
1843
if key in keys:
1772
1844
key_records.append((key, details[0]))
1773
1845
records_iter = enumerate(self._read_records_iter(key_records))
1774
1846
for (key_idx, (key, data, sha_value)) in records_iter:
1775
pb.update('Walking content', key_idx, total)
1847
pb.update(gettext('Walking content'), key_idx, total)
1776
1848
compression_parent = build_details[key][1]
1777
1849
if compression_parent is None:
1779
line_iterator = self._factory.get_fulltext_content(data)
1851
line_iterator = self._factory.get_fulltext_content(
1782
line_iterator = self._factory.get_linedelta_content(data)
1855
line_iterator = self._factory.get_linedelta_content(
1783
1857
# Now that we are yielding the data for this key, remove it
1784
1858
# from the list
1785
1859
keys.remove(key)
1894
1969
# 4168 calls in 2880 217 internal
1895
1970
# 4168 calls to _parse_record_header in 2121
1896
1971
# 4168 calls to readlines in 330
1897
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1899
record_contents = df.readlines()
1900
except Exception, e:
1901
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1902
(data, e.__class__.__name__, str(e)))
1903
header = record_contents.pop(0)
1904
rec = self._split_header(header)
1905
last_line = record_contents.pop()
1906
if len(record_contents) != int(rec[2]):
1907
raise KnitCorrupt(self,
1908
'incorrect number of lines %s != %s'
1909
' for version {%s} %s'
1910
% (len(record_contents), int(rec[2]),
1911
rec[1], record_contents))
1912
if last_line != 'end %s\n' % rec[1]:
1913
raise KnitCorrupt(self,
1914
'unexpected version end line %r, wanted %r'
1915
% (last_line, rec[1]))
1972
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
1974
record_contents = df.readlines()
1975
except Exception as e:
1976
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1977
(data, e.__class__.__name__, str(e)))
1978
header = record_contents.pop(0)
1979
rec = self._split_header(header)
1980
last_line = record_contents.pop()
1981
if len(record_contents) != int(rec[2]):
1982
raise KnitCorrupt(self,
1983
'incorrect number of lines %s != %s'
1984
' for version {%s} %s'
1985
% (len(record_contents), int(rec[2]),
1986
rec[1], record_contents))
1987
if last_line != b'end %s\n' % rec[1]:
1988
raise KnitCorrupt(self,
1989
'unexpected version end line %r, wanted %r'
1990
% (last_line, rec[1]))
1917
1991
return rec, record_contents
1919
1993
def _read_records_iter(self, records):
1985
2059
:param key: The key of the record. Currently keys are always serialised
1986
2060
using just the trailing component.
1987
2061
:param dense_lines: The bytes of lines but in a denser form. For
1988
instance, if lines is a list of 1000 bytestrings each ending in \n,
1989
dense_lines may be a list with one line in it, containing all the
1990
1000's lines and their \n's. Using dense_lines if it is already
1991
known is a win because the string join to create bytes in this
1992
function spends less time resizing the final string.
1993
:return: (len, a StringIO instance with the raw data ready to read.)
2062
instance, if lines is a list of 1000 bytestrings each ending in
2063
\\n, dense_lines may be a list with one line in it, containing all
2064
the 1000's lines and their \\n's. Using dense_lines if it is
2065
already known is a win because the string join to create bytes in
2066
this function spends less time resizing the final string.
2067
:return: (len, a BytesIO instance with the raw data ready to read.)
1995
chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
2069
chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
1996
2070
chunks.extend(dense_lines or lines)
1997
chunks.append("end %s\n" % key[-1])
2071
chunks.append(b"end " + key[-1] + b"\n")
1998
2072
for chunk in chunks:
1999
if type(chunk) is not str:
2073
if not isinstance(chunk, bytes):
2000
2074
raise AssertionError(
2001
2075
'data must be plain bytes was %s' % type(chunk))
2002
if lines and lines[-1][-1] != '\n':
2076
if lines and not lines[-1].endswith(b'\n'):
2003
2077
raise ValueError('corrupt lines value %r' % lines)
2004
compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
2078
compressed_bytes = b''.join(tuned_gzip.chunks_to_gzip(chunks))
2005
2079
return len(compressed_bytes), compressed_bytes
2007
2081
def _split_header(self, line):
2175
2249
# one line with next ('' for None)
2176
2250
# one line with byte count of the record bytes
2177
2251
# the record bytes
2178
for key, (record_bytes, (method, noeol), next) in \
2179
self._raw_record_map.iteritems():
2180
key_bytes = '\x00'.join(key)
2252
for key, (record_bytes, (method, noeol), next) in viewitems(
2253
self._raw_record_map):
2254
key_bytes = b'\x00'.join(key)
2181
2255
parents = self.global_map.get(key, None)
2182
2256
if parents is None:
2183
parent_bytes = 'None:'
2257
parent_bytes = b'None:'
2185
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2186
method_bytes = method
2259
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2260
method_bytes = method.encode('ascii')
2192
next_bytes = '\x00'.join(next)
2266
next_bytes = b'\x00'.join(next)
2195
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2196
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2197
len(record_bytes), record_bytes))
2198
map_bytes = ''.join(map_byte_list)
2269
map_byte_list.append(b'\n'.join(
2270
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2271
b'%d' % len(record_bytes), record_bytes]))
2272
map_bytes = b''.join(map_byte_list)
2199
2273
lines.append(map_bytes)
2200
bytes = '\n'.join(lines)
2274
bytes = b'\n'.join(lines)
2282
2356
end = len(bytes)
2283
2357
while start < end:
2284
2358
# 1 line with key
2285
line_end = bytes.find('\n', start)
2286
key = tuple(bytes[start:line_end].split('\x00'))
2359
line_end = bytes.find(b'\n', start)
2360
key = tuple(bytes[start:line_end].split(b'\x00'))
2287
2361
start = line_end + 1
2288
2362
# 1 line with parents (None: for None, '' for ())
2289
line_end = bytes.find('\n', start)
2363
line_end = bytes.find(b'\n', start)
2290
2364
line = bytes[start:line_end]
2365
if line == b'None:':
2294
2368
parents = tuple(
2295
[tuple(segment.split('\x00')) for segment in line.split('\t')
2369
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2297
2371
self.global_map[key] = parents
2298
2372
start = line_end + 1
2299
2373
# one line with method
2300
line_end = bytes.find('\n', start)
2374
line_end = bytes.find(b'\n', start)
2301
2375
line = bytes[start:line_end]
2376
method = line.decode('ascii')
2303
2377
start = line_end + 1
2304
2378
# one line with noeol
2305
line_end = bytes.find('\n', start)
2379
line_end = bytes.find(b'\n', start)
2306
2380
line = bytes[start:line_end]
2381
noeol = line == b"T"
2308
2382
start = line_end + 1
2309
# one line with next ('' for None)
2310
line_end = bytes.find('\n', start)
2383
# one line with next (b'' for None)
2384
line_end = bytes.find(b'\n', start)
2311
2385
line = bytes[start:line_end]
2315
next = tuple(bytes[start:line_end].split('\x00'))
2389
next = tuple(bytes[start:line_end].split(b'\x00'))
2316
2390
start = line_end + 1
2317
2391
# one line with byte count of the record bytes
2318
line_end = bytes.find('\n', start)
2392
line_end = bytes.find(b'\n', start)
2319
2393
line = bytes[start:line_end]
2320
2394
count = int(line)
2321
2395
start = line_end + 1
2322
2396
# the record bytes
2323
record_bytes = bytes[start:start+count]
2397
record_bytes = bytes[start:start + count]
2324
2398
start = start + count
2325
2399
# put it in the map
2326
2400
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2442
2516
for key, options, (_, pos, size), parents in path_keys:
2517
if not all(isinstance(option, bytes) for option in options):
2518
raise TypeError(options)
2443
2519
if parents is None:
2444
2520
# kndx indices cannot be parentless.
2446
line = "\n%s %s %s %s %s :" % (
2447
key[-1], ','.join(options), pos, size,
2448
self._dictionary_compress(parents))
2449
if type(line) is not str:
2524
+ key[-1], b','.join(options), b'%d' % pos, b'%d' % size,
2525
self._dictionary_compress(parents), b':'])
2526
if not isinstance(line, bytes):
2450
2527
raise AssertionError(
2451
2528
'data must be utf8 was %s' % type(line))
2452
2529
lines.append(line)
2453
2530
self._cache_key(key, options, pos, size, parents)
2454
2531
if len(orig_history):
2455
self._transport.append_bytes(path, ''.join(lines))
2532
self._transport.append_bytes(path, b''.join(lines))
2457
2534
self._init_index(path, lines)
2550
2627
for key in keys:
2551
2628
if key not in parent_map:
2553
2630
method = self.get_method(key)
2631
if not isinstance(method, str):
2632
raise TypeError(method)
2554
2633
parents = parent_map[key]
2555
2634
if method == 'fulltext':
2556
2635
compression_parent = None
2558
2637
compression_parent = parents[0]
2559
noeol = 'no-eol' in self.get_options(key)
2638
noeol = b'no-eol' in self.get_options(key)
2560
2639
index_memo = self.get_position(key)
2561
2640
result[key] = (index_memo, compression_parent,
2562
parents, (method, noeol))
2641
parents, (method, noeol))
2565
2644
def get_method(self, key):
2566
2645
"""Return compression method of specified key."""
2567
2646
options = self.get_options(key)
2568
if 'fulltext' in options:
2647
if b'fulltext' in options:
2569
2648
return 'fulltext'
2570
elif 'line-delta' in options:
2649
elif b'line-delta' in options:
2571
2650
return 'line-delta'
2573
raise errors.KnitIndexUnknownMethod(self, options)
2652
raise KnitIndexUnknownMethod(self, options)
2575
2654
def get_options(self, key):
2576
2655
"""Return a list representing options.
2784
2860
def _split_key(self, key):
2785
2861
"""Split key into a prefix and suffix."""
2862
# GZ 2018-07-03: This is intentionally either a sequence or bytes?
2863
if isinstance(key, bytes):
2864
return key[:-1], key[-1:]
2786
2865
return key[:-1], key[-1]
2789
class _KeyRefs(object):
2791
def __init__(self, track_new_keys=False):
2792
# dict mapping 'key' to 'set of keys referring to that key'
2795
# set remembering all new keys
2796
self.new_keys = set()
2798
self.new_keys = None
2804
self.new_keys.clear()
2806
def add_references(self, key, refs):
2807
# Record the new references
2808
for referenced in refs:
2810
needed_by = self.refs[referenced]
2812
needed_by = self.refs[referenced] = set()
2814
# Discard references satisfied by the new key
2817
def get_new_keys(self):
2818
return self.new_keys
2820
def get_unsatisfied_refs(self):
2821
return self.refs.iterkeys()
2823
def _satisfy_refs_for_key(self, key):
2827
# No keys depended on this key. That's ok.
2830
def add_key(self, key):
2831
# satisfy refs for key, and remember that we've seen this key.
2832
self._satisfy_refs_for_key(key)
2833
if self.new_keys is not None:
2834
self.new_keys.add(key)
2836
def satisfy_refs_for_keys(self, keys):
2838
self._satisfy_refs_for_key(key)
2840
def get_referrers(self):
2842
for referrers in self.refs.itervalues():
2843
result.update(referrers)
2847
2868
class _KnitGraphIndex(object):
2848
2869
"""A KnitVersionedFiles index layered on GraphIndex."""
2850
2871
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2851
add_callback=None, track_external_parent_refs=False):
2872
add_callback=None, track_external_parent_refs=False):
2852
2873
"""Construct a KnitGraphIndex on a graph_index.
2854
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2875
:param graph_index: An implementation of breezy.index.GraphIndex.
2855
2876
:param is_locked: A callback to check whether the object should answer
2857
2878
:param deltas: Allow delta-compressed records.
2950
2972
# Sometimes these are passed as a list rather than a tuple
2951
2973
passed = static_tuple.as_tuples(keys[key])
2952
2974
passed_parents = passed[1][:1]
2953
if (value[0] != keys[key][0][0] or
2954
parents != passed_parents):
2975
if (value[0:1] != keys[key][0][0:1]
2976
or parents != passed_parents):
2955
2977
node_refs = static_tuple.as_tuples(node_refs)
2956
2978
raise KnitCorrupt(self, "inconsistent details in add_records"
2957
": %s %s" % ((value, node_refs), passed))
2979
": %s %s" % ((value, node_refs), passed))
2960
2982
if self._parents:
2961
for key, (value, node_refs) in keys.iteritems():
2983
for key, (value, node_refs) in viewitems(keys):
2962
2984
result.append((key, value, node_refs))
2964
for key, (value, node_refs) in keys.iteritems():
2986
for key, (value, node_refs) in viewitems(keys):
2965
2987
result.append((key, value))
2966
2988
self._add_callback(result)
2967
2989
if missing_compression_parents:
3281
class _DirectPackAccess(object):
3282
"""Access to data in one or more packs with less translation."""
3284
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3285
"""Create a _DirectPackAccess object.
3287
:param index_to_packs: A dict mapping index objects to the transport
3288
and file names for obtaining data.
3289
:param reload_func: A function to call if we determine that the pack
3290
files have moved and we need to reload our caches. See
3291
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3293
self._container_writer = None
3294
self._write_index = None
3295
self._indices = index_to_packs
3296
self._reload_func = reload_func
3297
self._flush_func = flush_func
3299
def add_raw_records(self, key_sizes, raw_data):
3300
"""Add raw knit bytes to a storage area.
3302
The data is spooled to the container writer in one bytes-record per
3305
:param sizes: An iterable of tuples containing the key and size of each
3307
:param raw_data: A bytestring containing the data.
3308
:return: A list of memos to retrieve the record later. Each memo is an
3309
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3310
length), where the index field is the write_index object supplied
3311
to the PackAccess object.
3313
if type(raw_data) is not str:
3314
raise AssertionError(
3315
'data must be plain bytes was %s' % type(raw_data))
3318
for key, size in key_sizes:
3319
p_offset, p_length = self._container_writer.add_bytes_record(
3320
raw_data[offset:offset+size], [])
3322
result.append((self._write_index, p_offset, p_length))
3326
"""Flush pending writes on this access object.
3328
This will flush any buffered writes to a NewPack.
3330
if self._flush_func is not None:
3333
def get_raw_records(self, memos_for_retrieval):
3334
"""Get the raw bytes for a records.
3336
:param memos_for_retrieval: An iterable containing the (index, pos,
3337
length) memo for retrieving the bytes. The Pack access method
3338
looks up the pack to use for a given record in its index_to_pack
3340
:return: An iterator over the bytes of the records.
3342
# first pass, group into same-index requests
3344
current_index = None
3345
for (index, offset, length) in memos_for_retrieval:
3346
if current_index == index:
3347
current_list.append((offset, length))
3349
if current_index is not None:
3350
request_lists.append((current_index, current_list))
3351
current_index = index
3352
current_list = [(offset, length)]
3353
# handle the last entry
3354
if current_index is not None:
3355
request_lists.append((current_index, current_list))
3356
for index, offsets in request_lists:
3358
transport, path = self._indices[index]
3360
# A KeyError here indicates that someone has triggered an index
3361
# reload, and this index has gone missing, we need to start
3363
if self._reload_func is None:
3364
# If we don't have a _reload_func there is nothing that can
3367
raise errors.RetryWithNewPacks(index,
3368
reload_occurred=True,
3369
exc_info=sys.exc_info())
3371
reader = pack.make_readv_reader(transport, path, offsets)
3372
for names, read_func in reader.iter_records():
3373
yield read_func(None)
3374
except errors.NoSuchFile:
3375
# A NoSuchFile error indicates that a pack file has gone
3376
# missing on disk, we need to trigger a reload, and start over.
3377
if self._reload_func is None:
3379
raise errors.RetryWithNewPacks(transport.abspath(path),
3380
reload_occurred=False,
3381
exc_info=sys.exc_info())
3383
def set_writer(self, writer, index, transport_packname):
3384
"""Set a writer to use for adding data."""
3385
if index is not None:
3386
self._indices[index] = transport_packname
3387
self._container_writer = writer
3388
self._write_index = index
3390
def reload_or_raise(self, retry_exc):
3391
"""Try calling the reload function, or re-raise the original exception.
3393
This should be called after _DirectPackAccess raises a
3394
RetryWithNewPacks exception. This function will handle the common logic
3395
of determining when the error is fatal versus being temporary.
3396
It will also make sure that the original exception is raised, rather
3397
than the RetryWithNewPacks exception.
3399
If this function returns, then the calling function should retry
3400
whatever operation was being performed. Otherwise an exception will
3403
:param retry_exc: A RetryWithNewPacks exception.
3406
if self._reload_func is None:
3408
elif not self._reload_func():
3409
# The reload claimed that nothing changed
3410
if not retry_exc.reload_occurred:
3411
# If there wasn't an earlier reload, then we really were
3412
# expecting to find changes. We didn't find them, so this is a
3416
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3417
raise exc_class, exc_value, exc_traceback
3420
# Deprecated, use PatienceSequenceMatcher instead
3421
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3424
3303
def annotate_knit(knit, revision_id):
3425
3304
"""Annotate a knit with no cached annotations.
3535
3414
records, ann_keys = self._get_build_graph(key)
3536
3415
for idx, (sub_key, text, num_lines) in enumerate(
3537
self._extract_texts(records)):
3416
self._extract_texts(records)):
3538
3417
if pb is not None:
3539
pb.update('annotating', idx, len(records))
3418
pb.update(gettext('annotating'), idx, len(records))
3540
3419
yield sub_key, text, num_lines
3541
3420
for sub_key in ann_keys:
3542
3421
text = self._text_cache[sub_key]
3543
num_lines = len(text) # bad assumption
3422
num_lines = len(text) # bad assumption
3544
3423
yield sub_key, text, num_lines
3546
except errors.RetryWithNewPacks, e:
3425
except errors.RetryWithNewPacks as e:
3547
3426
self._vf._access.reload_or_raise(e)
3548
3427
# The cached build_details are no longer valid
3549
3428
self._all_build_details.clear()
3551
3430
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3552
3431
parent_lines = self._text_cache[compression_parent]
3553
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3432
blocks = list(KnitContent.get_line_delta_blocks(
3433
delta, parent_lines, lines))
3554
3434
self._matching_blocks[(key, compression_parent)] = blocks
3556
3436
def _expand_record(self, key, parent_keys, compression_parent, record,