128
127
DATA_SUFFIX = '.knit'
129
128
INDEX_SUFFIX = '.kndx'
130
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
133
class KnitError(InternalBzrError):
138
class KnitCorrupt(KnitError):
140
_fmt = "Knit %(filename)s corrupt: %(how)s"
142
def __init__(self, filename, how):
143
KnitError.__init__(self)
144
self.filename = filename
148
class SHA1KnitCorrupt(KnitCorrupt):
150
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
151
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
154
def __init__(self, filename, actual, expected, key, content):
155
KnitError.__init__(self)
156
self.filename = filename
158
self.expected = expected
160
self.content = content
163
class KnitDataStreamIncompatible(KnitError):
164
# Not raised anymore, as we can convert data streams. In future we may
165
# need it again for more exotic cases, so we're keeping it around for now.
167
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
169
def __init__(self, stream_format, target_format):
170
self.stream_format = stream_format
171
self.target_format = target_format
174
class KnitDataStreamUnknown(KnitError):
175
# Indicates a data stream we don't know how to handle.
177
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
179
def __init__(self, stream_format):
180
self.stream_format = stream_format
183
class KnitHeaderError(KnitError):
185
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
187
def __init__(self, badline, filename):
188
KnitError.__init__(self)
189
self.badline = badline
190
self.filename = filename
193
class KnitIndexUnknownMethod(KnitError):
194
"""Raised when we don't understand the storage method.
196
Currently only 'fulltext' and 'line-delta' are supported.
199
_fmt = ("Knit index %(filename)s does not have a known method"
200
" in options: %(options)r")
202
def __init__(self, filename, options):
203
KnitError.__init__(self)
204
self.filename = filename
205
self.options = options
129
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
208
132
class KnitAdapter(object):
711
632
# loop to minimise any performance impact
713
634
for header in lines:
714
start, end, count = [int(n) for n in header.split(b',')]
715
contents = [next(lines).split(b' ', 1)[1]
716
for _ in range(count)]
635
start, end, count = [int(n) for n in header.split(',')]
636
contents = [next().split(' ', 1)[1] for i in xrange(count)]
717
637
result.append((start, end, count, contents))
719
639
for header in lines:
720
start, end, count = [int(n) for n in header.split(b',')]
721
contents = [tuple(next(lines).split(b' ', 1))
722
for _ in range(count)]
640
start, end, count = [int(n) for n in header.split(',')]
641
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
723
642
result.append((start, end, count, contents))
726
645
def get_fulltext_content(self, lines):
727
646
"""Extract just the content lines from a fulltext."""
728
return (line.split(b' ', 1)[1] for line in lines)
647
return (line.split(' ', 1)[1] for line in lines)
730
649
def get_linedelta_content(self, lines):
731
650
"""Extract just the content from a line delta.
879
800
max_delta_chain = 0
880
801
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
881
key_elements=keylength)
802
key_elements=keylength)
882
803
stream = transport.open_write_stream('newpack')
883
804
writer = pack.ContainerWriter(stream.write)
885
index = _KnitGraphIndex(graph_index, lambda: True, parents=parents,
886
deltas=delta, add_callback=graph_index.add_nodes)
887
access = pack_repo._DirectPackAccess({})
806
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
807
deltas=delta, add_callback=graph_index.add_nodes)
808
access = _DirectPackAccess({})
888
809
access.set_writer(writer, graph_index, (transport, 'newpack'))
889
810
result = KnitVersionedFiles(index, access,
890
max_delta_chain=max_delta_chain)
811
max_delta_chain=max_delta_chain)
891
812
result.stream = stream
892
813
result.writer = writer
973
def without_fallbacks(self):
974
"""Return a clone of this object without any fallbacks configured."""
975
return KnitVersionedFiles(self._index, self._access,
976
self._max_delta_chain, self._factory.annotated,
979
894
def add_fallback_versioned_files(self, a_versioned_files):
980
895
"""Add a source of texts for texts not present in this knit.
982
897
:param a_versioned_files: A VersionedFiles object.
984
self._immediate_fallback_vfs.append(a_versioned_files)
899
self._fallback_vfs.append(a_versioned_files)
986
901
def add_lines(self, key, parents, lines, parent_texts=None,
987
left_matching_blocks=None, nostore_sha=None, random_id=False,
902
left_matching_blocks=None, nostore_sha=None, random_id=False,
989
904
"""See VersionedFiles.add_lines()."""
990
905
self._index._check_write_ok()
991
906
self._check_add(key, lines, random_id, check_content)
994
909
# indexes can't directly store that, so we give them
995
910
# an empty tuple instead.
997
line_bytes = b''.join(lines)
912
line_bytes = ''.join(lines)
998
913
return self._add(key, lines, parents,
999
parent_texts, left_matching_blocks, nostore_sha, random_id,
1000
line_bytes=line_bytes)
914
parent_texts, left_matching_blocks, nostore_sha, random_id,
915
line_bytes=line_bytes)
917
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
918
"""See VersionedFiles._add_text()."""
919
self._index._check_write_ok()
920
self._check_add(key, None, random_id, check_content=False)
921
if text.__class__ is not str:
922
raise errors.BzrBadParameterUnicode("text")
924
# The caller might pass None if there is no graph data, but kndx
925
# indexes can't directly store that, so we give them
926
# an empty tuple instead.
928
return self._add(key, None, parents,
929
None, None, nostore_sha, random_id,
1002
932
def _add(self, key, lines, parents, parent_texts,
1003
left_matching_blocks, nostore_sha, random_id,
933
left_matching_blocks, nostore_sha, random_id,
1005
935
"""Add a set of lines on top of version specified by parents.
1007
937
Any versions not present will be converted into ghosts.
1058
988
lines = lines[:]
1059
989
# Replace the last line with one that ends in a final newline
1060
lines[-1] = lines[-1] + b'\n'
990
lines[-1] = lines[-1] + '\n'
1061
991
if lines is None:
1062
992
lines = osutils.split_lines(line_bytes)
1064
994
for element in key[:-1]:
1065
if not isinstance(element, bytes):
1066
raise TypeError("key contains non-bytestrings: %r" % (key,))
995
if type(element) is not str:
996
raise TypeError("key contains non-strings: %r" % (key,))
1067
997
if key[-1] is None:
1068
key = key[:-1] + (b'sha1:' + digest,)
1069
elif not isinstance(key[-1], bytes):
1070
raise TypeError("key contains non-bytestrings: %r" % (key,))
998
key = key[:-1] + ('sha1:' + digest,)
999
elif type(key[-1]) is not str:
1000
raise TypeError("key contains non-strings: %r" % (key,))
1071
1001
# Knit hunks are still last-element only
1072
1002
version_id = key[-1]
1073
1003
content = self._factory.make(lines, version_id)
1078
1008
if delta or (self._factory.annotated and len(present_parents) > 0):
1079
1009
# Merge annotations from parent texts if needed.
1080
1010
delta_hunks = self._merge_annotations(content, present_parents,
1081
parent_texts, delta, self._factory.annotated,
1082
left_matching_blocks)
1011
parent_texts, delta, self._factory.annotated,
1012
left_matching_blocks)
1085
options.append(b'line-delta')
1015
options.append('line-delta')
1086
1016
store_lines = self._factory.lower_line_delta(delta_hunks)
1087
size, data = self._record_to_data(key, digest,
1017
size, bytes = self._record_to_data(key, digest,
1090
options.append(b'fulltext')
1020
options.append('fulltext')
1091
1021
# isinstance is slower and we have no hierarchy.
1092
1022
if self._factory.__class__ is KnitPlainFactory:
1093
1023
# Use the already joined bytes saving iteration time in
1094
1024
# _record_to_data.
1095
1025
dense_lines = [line_bytes]
1097
dense_lines.append(b'\n')
1098
size, data = self._record_to_data(key, digest,
1027
dense_lines.append('\n')
1028
size, bytes = self._record_to_data(key, digest,
1101
1031
# get mixed annotation + content and feed it into the
1103
1033
store_lines = self._factory.lower_fulltext(content)
1104
size, data = self._record_to_data(key, digest,
1034
size, bytes = self._record_to_data(key, digest,
1107
access_memo = self._access.add_raw_records([(key, size)], data)[0]
1037
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1108
1038
self._index.add_records(
1109
1039
((key, options, access_memo, parents),),
1110
1040
random_id=random_id)
1301
1241
"""Produce a dictionary of knit records.
1303
1243
:return: {key:(record, record_details, digest, next)}
1305
* record: data returned from read_records (a KnitContentobject)
1306
* record_details: opaque information to pass to parse_record
1307
* digest: SHA1 digest of the full text after all steps are done
1308
* next: build-parent of the version, i.e. the leftmost ancestor.
1245
data returned from read_records (a KnitContentobject)
1247
opaque information to pass to parse_record
1249
SHA1 digest of the full text after all steps are done
1251
build-parent of the version, i.e. the leftmost ancestor.
1309
1252
Will be None if the record is not a delta.
1311
1253
:param keys: The keys to build a map for
1312
1254
:param allow_missing: If some records are missing, rather than
1313
1255
error, just return the data that could be generated.
1315
1257
raw_map = self._get_record_map_unparsed(keys,
1316
allow_missing=allow_missing)
1258
allow_missing=allow_missing)
1317
1259
return self._raw_map_to_record_map(raw_map)
1319
1261
def _raw_map_to_record_map(self, raw_map):
1463
1405
keys = set(remaining_keys)
1464
1406
for content_factory in self._get_remaining_record_stream(keys,
1465
ordering, include_delta_closure):
1407
ordering, include_delta_closure):
1466
1408
remaining_keys.discard(content_factory.key)
1467
1409
yield content_factory
1469
except errors.RetryWithNewPacks as e:
1411
except errors.RetryWithNewPacks, e:
1470
1412
self._access.reload_or_raise(e)
1472
1414
def _get_remaining_record_stream(self, keys, ordering,
1473
1415
include_delta_closure):
1474
1416
"""This function is the 'retry' portion for get_record_stream."""
1475
1417
if include_delta_closure:
1476
positions = self._get_components_positions(
1477
keys, allow_missing=True)
1418
positions = self._get_components_positions(keys, allow_missing=True)
1479
1420
build_details = self._index.get_build_details(keys)
1480
1421
# map from key to
1481
1422
# (record_details, access_memo, compression_parent_key)
1482
1423
positions = dict((key, self._build_details_to_components(details))
1483
for key, details in viewitems(build_details))
1424
for key, details in build_details.iteritems())
1484
1425
absent_keys = keys.difference(set(positions))
1485
1426
# There may be more absent keys : if we're missing the basis component
1486
1427
# and are trying to include the delta closure.
1827
1767
# we need key, position, length
1828
1768
key_records = []
1829
1769
build_details = self._index.get_build_details(keys)
1830
for key, details in viewitems(build_details):
1770
for key, details in build_details.iteritems():
1831
1771
if key in keys:
1832
1772
key_records.append((key, details[0]))
1833
1773
records_iter = enumerate(self._read_records_iter(key_records))
1834
1774
for (key_idx, (key, data, sha_value)) in records_iter:
1835
pb.update(gettext('Walking content'), key_idx, total)
1775
pb.update('Walking content', key_idx, total)
1836
1776
compression_parent = build_details[key][1]
1837
1777
if compression_parent is None:
1839
line_iterator = self._factory.get_fulltext_content(
1779
line_iterator = self._factory.get_fulltext_content(data)
1843
line_iterator = self._factory.get_linedelta_content(
1782
line_iterator = self._factory.get_linedelta_content(data)
1845
1783
# Now that we are yielding the data for this key, remove it
1846
1784
# from the list
1847
1785
keys.remove(key)
1957
1894
# 4168 calls in 2880 217 internal
1958
1895
# 4168 calls to _parse_record_header in 2121
1959
1896
# 4168 calls to readlines in 330
1960
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
1962
record_contents = df.readlines()
1963
except Exception as e:
1964
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1965
(data, e.__class__.__name__, str(e)))
1966
header = record_contents.pop(0)
1967
rec = self._split_header(header)
1968
last_line = record_contents.pop()
1969
if len(record_contents) != int(rec[2]):
1970
raise KnitCorrupt(self,
1971
'incorrect number of lines %s != %s'
1972
' for version {%s} %s'
1973
% (len(record_contents), int(rec[2]),
1974
rec[1], record_contents))
1975
if last_line != b'end %s\n' % rec[1]:
1976
raise KnitCorrupt(self,
1977
'unexpected version end line %r, wanted %r'
1978
% (last_line, rec[1]))
1897
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1899
record_contents = df.readlines()
1900
except Exception, e:
1901
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1902
(data, e.__class__.__name__, str(e)))
1903
header = record_contents.pop(0)
1904
rec = self._split_header(header)
1905
last_line = record_contents.pop()
1906
if len(record_contents) != int(rec[2]):
1907
raise KnitCorrupt(self,
1908
'incorrect number of lines %s != %s'
1909
' for version {%s} %s'
1910
% (len(record_contents), int(rec[2]),
1911
rec[1], record_contents))
1912
if last_line != 'end %s\n' % rec[1]:
1913
raise KnitCorrupt(self,
1914
'unexpected version end line %r, wanted %r'
1915
% (last_line, rec[1]))
1979
1917
return rec, record_contents
1981
1919
def _read_records_iter(self, records):
2047
1985
:param key: The key of the record. Currently keys are always serialised
2048
1986
using just the trailing component.
2049
1987
:param dense_lines: The bytes of lines but in a denser form. For
2050
instance, if lines is a list of 1000 bytestrings each ending in
2051
\\n, dense_lines may be a list with one line in it, containing all
2052
the 1000's lines and their \\n's. Using dense_lines if it is
2053
already known is a win because the string join to create bytes in
2054
this function spends less time resizing the final string.
2055
:return: (len, a BytesIO instance with the raw data ready to read.)
1988
instance, if lines is a list of 1000 bytestrings each ending in \n,
1989
dense_lines may be a list with one line in it, containing all the
1990
1000's lines and their \n's. Using dense_lines if it is already
1991
known is a win because the string join to create bytes in this
1992
function spends less time resizing the final string.
1993
:return: (len, a StringIO instance with the raw data ready to read.)
2057
chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
1995
chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
2058
1996
chunks.extend(dense_lines or lines)
2059
chunks.append(b"end " + key[-1] + b"\n")
1997
chunks.append("end %s\n" % key[-1])
2060
1998
for chunk in chunks:
2061
if not isinstance(chunk, bytes):
1999
if type(chunk) is not str:
2062
2000
raise AssertionError(
2063
2001
'data must be plain bytes was %s' % type(chunk))
2064
if lines and not lines[-1].endswith(b'\n'):
2002
if lines and lines[-1][-1] != '\n':
2065
2003
raise ValueError('corrupt lines value %r' % lines)
2066
compressed_bytes = b''.join(tuned_gzip.chunks_to_gzip(chunks))
2004
compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
2067
2005
return len(compressed_bytes), compressed_bytes
2069
2007
def _split_header(self, line):
2237
2175
# one line with next ('' for None)
2238
2176
# one line with byte count of the record bytes
2239
2177
# the record bytes
2240
for key, (record_bytes, (method, noeol), next) in viewitems(
2241
self._raw_record_map):
2242
key_bytes = b'\x00'.join(key)
2178
for key, (record_bytes, (method, noeol), next) in \
2179
self._raw_record_map.iteritems():
2180
key_bytes = '\x00'.join(key)
2243
2181
parents = self.global_map.get(key, None)
2244
2182
if parents is None:
2245
parent_bytes = b'None:'
2183
parent_bytes = 'None:'
2247
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2248
method_bytes = method.encode('ascii')
2185
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2186
method_bytes = method
2254
next_bytes = b'\x00'.join(next)
2192
next_bytes = '\x00'.join(next)
2257
map_byte_list.append(b'\n'.join(
2258
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2259
b'%d' % len(record_bytes), record_bytes]))
2260
map_bytes = b''.join(map_byte_list)
2195
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2196
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2197
len(record_bytes), record_bytes))
2198
map_bytes = ''.join(map_byte_list)
2261
2199
lines.append(map_bytes)
2262
bytes = b'\n'.join(lines)
2200
bytes = '\n'.join(lines)
2344
2282
end = len(bytes)
2345
2283
while start < end:
2346
2284
# 1 line with key
2347
line_end = bytes.find(b'\n', start)
2348
key = tuple(bytes[start:line_end].split(b'\x00'))
2285
line_end = bytes.find('\n', start)
2286
key = tuple(bytes[start:line_end].split('\x00'))
2349
2287
start = line_end + 1
2350
2288
# 1 line with parents (None: for None, '' for ())
2351
line_end = bytes.find(b'\n', start)
2289
line_end = bytes.find('\n', start)
2352
2290
line = bytes[start:line_end]
2353
if line == b'None:':
2356
2294
parents = tuple(
2357
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2295
[tuple(segment.split('\x00')) for segment in line.split('\t')
2359
2297
self.global_map[key] = parents
2360
2298
start = line_end + 1
2361
2299
# one line with method
2362
line_end = bytes.find(b'\n', start)
2300
line_end = bytes.find('\n', start)
2363
2301
line = bytes[start:line_end]
2364
method = line.decode('ascii')
2365
2303
start = line_end + 1
2366
2304
# one line with noeol
2367
line_end = bytes.find(b'\n', start)
2305
line_end = bytes.find('\n', start)
2368
2306
line = bytes[start:line_end]
2369
noeol = line == b"T"
2370
2308
start = line_end + 1
2371
# one line with next (b'' for None)
2372
line_end = bytes.find(b'\n', start)
2309
# one line with next ('' for None)
2310
line_end = bytes.find('\n', start)
2373
2311
line = bytes[start:line_end]
2377
next = tuple(bytes[start:line_end].split(b'\x00'))
2315
next = tuple(bytes[start:line_end].split('\x00'))
2378
2316
start = line_end + 1
2379
2317
# one line with byte count of the record bytes
2380
line_end = bytes.find(b'\n', start)
2318
line_end = bytes.find('\n', start)
2381
2319
line = bytes[start:line_end]
2382
2320
count = int(line)
2383
2321
start = line_end + 1
2384
2322
# the record bytes
2385
record_bytes = bytes[start:start + count]
2323
record_bytes = bytes[start:start+count]
2386
2324
start = start + count
2387
2325
# put it in the map
2388
2326
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2504
2442
for key, options, (_, pos, size), parents in path_keys:
2505
if not all(isinstance(option, bytes) for option in options):
2506
raise TypeError(options)
2507
2443
if parents is None:
2508
2444
# kndx indices cannot be parentless.
2512
+ key[-1], b','.join(options), b'%d' % pos, b'%d' % size,
2513
self._dictionary_compress(parents), b':'])
2514
if not isinstance(line, bytes):
2446
line = "\n%s %s %s %s %s :" % (
2447
key[-1], ','.join(options), pos, size,
2448
self._dictionary_compress(parents))
2449
if type(line) is not str:
2515
2450
raise AssertionError(
2516
2451
'data must be utf8 was %s' % type(line))
2517
2452
lines.append(line)
2518
2453
self._cache_key(key, options, pos, size, parents)
2519
2454
if len(orig_history):
2520
self._transport.append_bytes(path, b''.join(lines))
2455
self._transport.append_bytes(path, ''.join(lines))
2522
2457
self._init_index(path, lines)
2615
2550
for key in keys:
2616
2551
if key not in parent_map:
2618
2553
method = self.get_method(key)
2619
if not isinstance(method, str):
2620
raise TypeError(method)
2621
2554
parents = parent_map[key]
2622
2555
if method == 'fulltext':
2623
2556
compression_parent = None
2625
2558
compression_parent = parents[0]
2626
noeol = b'no-eol' in self.get_options(key)
2559
noeol = 'no-eol' in self.get_options(key)
2627
2560
index_memo = self.get_position(key)
2628
2561
result[key] = (index_memo, compression_parent,
2629
parents, (method, noeol))
2562
parents, (method, noeol))
2632
2565
def get_method(self, key):
2633
2566
"""Return compression method of specified key."""
2634
2567
options = self.get_options(key)
2635
if b'fulltext' in options:
2568
if 'fulltext' in options:
2636
2569
return 'fulltext'
2637
elif b'line-delta' in options:
2570
elif 'line-delta' in options:
2638
2571
return 'line-delta'
2640
raise KnitIndexUnknownMethod(self, options)
2573
raise errors.KnitIndexUnknownMethod(self, options)
2642
2575
def get_options(self, key):
2643
2576
"""Return a list representing options.
2848
2784
def _split_key(self, key):
2849
2785
"""Split key into a prefix and suffix."""
2850
# GZ 2018-07-03: This is intentionally either a sequence or bytes?
2851
if isinstance(key, bytes):
2852
return key[:-1], key[-1:]
2853
2786
return key[:-1], key[-1]
2789
class _KeyRefs(object):
2791
def __init__(self, track_new_keys=False):
2792
# dict mapping 'key' to 'set of keys referring to that key'
2795
# set remembering all new keys
2796
self.new_keys = set()
2798
self.new_keys = None
2804
self.new_keys.clear()
2806
def add_references(self, key, refs):
2807
# Record the new references
2808
for referenced in refs:
2810
needed_by = self.refs[referenced]
2812
needed_by = self.refs[referenced] = set()
2814
# Discard references satisfied by the new key
2817
def get_new_keys(self):
2818
return self.new_keys
2820
def get_unsatisfied_refs(self):
2821
return self.refs.iterkeys()
2823
def _satisfy_refs_for_key(self, key):
2827
# No keys depended on this key. That's ok.
2830
def add_key(self, key):
2831
# satisfy refs for key, and remember that we've seen this key.
2832
self._satisfy_refs_for_key(key)
2833
if self.new_keys is not None:
2834
self.new_keys.add(key)
2836
def satisfy_refs_for_keys(self, keys):
2838
self._satisfy_refs_for_key(key)
2840
def get_referrers(self):
2842
for referrers in self.refs.itervalues():
2843
result.update(referrers)
2856
2847
class _KnitGraphIndex(object):
2857
2848
"""A KnitVersionedFiles index layered on GraphIndex."""
2859
2850
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2860
add_callback=None, track_external_parent_refs=False):
2851
add_callback=None, track_external_parent_refs=False):
2861
2852
"""Construct a KnitGraphIndex on a graph_index.
2863
:param graph_index: An implementation of breezy.index.GraphIndex.
2854
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2864
2855
:param is_locked: A callback to check whether the object should answer
2866
2857
:param deltas: Allow delta-compressed records.
2960
2950
# Sometimes these are passed as a list rather than a tuple
2961
2951
passed = static_tuple.as_tuples(keys[key])
2962
2952
passed_parents = passed[1][:1]
2963
if (value[0:1] != keys[key][0][0:1]
2964
or parents != passed_parents):
2953
if (value[0] != keys[key][0][0] or
2954
parents != passed_parents):
2965
2955
node_refs = static_tuple.as_tuples(node_refs)
2966
2956
raise KnitCorrupt(self, "inconsistent details in add_records"
2967
": %s %s" % ((value, node_refs), passed))
2957
": %s %s" % ((value, node_refs), passed))
2970
2960
if self._parents:
2971
for key, (value, node_refs) in viewitems(keys):
2961
for key, (value, node_refs) in keys.iteritems():
2972
2962
result.append((key, value, node_refs))
2974
for key, (value, node_refs) in viewitems(keys):
2964
for key, (value, node_refs) in keys.iteritems():
2975
2965
result.append((key, value))
2976
2966
self._add_callback(result)
2977
2967
if missing_compression_parents:
3281
class _DirectPackAccess(object):
3282
"""Access to data in one or more packs with less translation."""
3284
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3285
"""Create a _DirectPackAccess object.
3287
:param index_to_packs: A dict mapping index objects to the transport
3288
and file names for obtaining data.
3289
:param reload_func: A function to call if we determine that the pack
3290
files have moved and we need to reload our caches. See
3291
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3293
self._container_writer = None
3294
self._write_index = None
3295
self._indices = index_to_packs
3296
self._reload_func = reload_func
3297
self._flush_func = flush_func
3299
def add_raw_records(self, key_sizes, raw_data):
3300
"""Add raw knit bytes to a storage area.
3302
The data is spooled to the container writer in one bytes-record per
3305
:param sizes: An iterable of tuples containing the key and size of each
3307
:param raw_data: A bytestring containing the data.
3308
:return: A list of memos to retrieve the record later. Each memo is an
3309
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3310
length), where the index field is the write_index object supplied
3311
to the PackAccess object.
3313
if type(raw_data) is not str:
3314
raise AssertionError(
3315
'data must be plain bytes was %s' % type(raw_data))
3318
for key, size in key_sizes:
3319
p_offset, p_length = self._container_writer.add_bytes_record(
3320
raw_data[offset:offset+size], [])
3322
result.append((self._write_index, p_offset, p_length))
3326
"""Flush pending writes on this access object.
3328
This will flush any buffered writes to a NewPack.
3330
if self._flush_func is not None:
3333
def get_raw_records(self, memos_for_retrieval):
3334
"""Get the raw bytes for a records.
3336
:param memos_for_retrieval: An iterable containing the (index, pos,
3337
length) memo for retrieving the bytes. The Pack access method
3338
looks up the pack to use for a given record in its index_to_pack
3340
:return: An iterator over the bytes of the records.
3342
# first pass, group into same-index requests
3344
current_index = None
3345
for (index, offset, length) in memos_for_retrieval:
3346
if current_index == index:
3347
current_list.append((offset, length))
3349
if current_index is not None:
3350
request_lists.append((current_index, current_list))
3351
current_index = index
3352
current_list = [(offset, length)]
3353
# handle the last entry
3354
if current_index is not None:
3355
request_lists.append((current_index, current_list))
3356
for index, offsets in request_lists:
3358
transport, path = self._indices[index]
3360
# A KeyError here indicates that someone has triggered an index
3361
# reload, and this index has gone missing, we need to start
3363
if self._reload_func is None:
3364
# If we don't have a _reload_func there is nothing that can
3367
raise errors.RetryWithNewPacks(index,
3368
reload_occurred=True,
3369
exc_info=sys.exc_info())
3371
reader = pack.make_readv_reader(transport, path, offsets)
3372
for names, read_func in reader.iter_records():
3373
yield read_func(None)
3374
except errors.NoSuchFile:
3375
# A NoSuchFile error indicates that a pack file has gone
3376
# missing on disk, we need to trigger a reload, and start over.
3377
if self._reload_func is None:
3379
raise errors.RetryWithNewPacks(transport.abspath(path),
3380
reload_occurred=False,
3381
exc_info=sys.exc_info())
3383
def set_writer(self, writer, index, transport_packname):
3384
"""Set a writer to use for adding data."""
3385
if index is not None:
3386
self._indices[index] = transport_packname
3387
self._container_writer = writer
3388
self._write_index = index
3390
def reload_or_raise(self, retry_exc):
3391
"""Try calling the reload function, or re-raise the original exception.
3393
This should be called after _DirectPackAccess raises a
3394
RetryWithNewPacks exception. This function will handle the common logic
3395
of determining when the error is fatal versus being temporary.
3396
It will also make sure that the original exception is raised, rather
3397
than the RetryWithNewPacks exception.
3399
If this function returns, then the calling function should retry
3400
whatever operation was being performed. Otherwise an exception will
3403
:param retry_exc: A RetryWithNewPacks exception.
3406
if self._reload_func is None:
3408
elif not self._reload_func():
3409
# The reload claimed that nothing changed
3410
if not retry_exc.reload_occurred:
3411
# If there wasn't an earlier reload, then we really were
3412
# expecting to find changes. We didn't find them, so this is a
3416
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3417
raise exc_class, exc_value, exc_traceback
3420
# Deprecated, use PatienceSequenceMatcher instead
3421
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3291
3424
def annotate_knit(knit, revision_id):
3292
3425
"""Annotate a knit with no cached annotations.
3402
3535
records, ann_keys = self._get_build_graph(key)
3403
3536
for idx, (sub_key, text, num_lines) in enumerate(
3404
self._extract_texts(records)):
3537
self._extract_texts(records)):
3405
3538
if pb is not None:
3406
pb.update(gettext('annotating'), idx, len(records))
3539
pb.update('annotating', idx, len(records))
3407
3540
yield sub_key, text, num_lines
3408
3541
for sub_key in ann_keys:
3409
3542
text = self._text_cache[sub_key]
3410
num_lines = len(text) # bad assumption
3543
num_lines = len(text) # bad assumption
3411
3544
yield sub_key, text, num_lines
3413
except errors.RetryWithNewPacks as e:
3546
except errors.RetryWithNewPacks, e:
3414
3547
self._vf._access.reload_or_raise(e)
3415
3548
# The cached build_details are no longer valid
3416
3549
self._all_build_details.clear()
3418
3551
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3419
3552
parent_lines = self._text_cache[compression_parent]
3420
blocks = list(KnitContent.get_line_delta_blocks(
3421
delta, parent_lines, lines))
3553
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3422
3554
self._matching_blocks[(key, compression_parent)] = blocks
3424
3556
def _expand_record(self, key, parent_keys, compression_parent, record,