194
196
[compression_parent], 'unordered', True).next()
195
197
if basis_entry.storage_kind == 'absent':
196
198
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
197
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
199
basis_chunks = basis_entry.get_bytes_as('chunked')
200
basis_lines = osutils.chunks_to_lines(basis_chunks)
198
201
# Manually apply the delta because we have one annotated content and
200
203
basis_content = PlainKnitContent(basis_lines, compression_parent)
227
230
[compression_parent], 'unordered', True).next()
228
231
if basis_entry.storage_kind == 'absent':
229
232
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
230
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
233
basis_chunks = basis_entry.get_bytes_as('chunked')
234
basis_lines = osutils.chunks_to_lines(basis_chunks)
231
235
basis_content = PlainKnitContent(basis_lines, compression_parent)
232
236
# Manually apply the delta because we have one annotated content and
274
278
def get_bytes_as(self, storage_kind):
275
279
if storage_kind == self.storage_kind:
276
280
return self._raw_record
277
if storage_kind == 'fulltext' and self._knit is not None:
278
return self._knit.get_text(self.key[0])
280
raise errors.UnavailableRepresentation(self.key, storage_kind,
281
if self._knit is not None:
282
if storage_kind == 'chunked':
283
return self._knit.get_lines(self.key[0])
284
elif storage_kind == 'fulltext':
285
return self._knit.get_text(self.key[0])
286
raise errors.UnavailableRepresentation(self.key, storage_kind,
284
290
class KnitContent(object):
1106
1118
:param allow_missing: If some records are missing, rather than
1107
1119
error, just return the data that could be generated.
1109
position_map = self._get_components_positions(keys,
1110
allow_missing=allow_missing)
1111
# key = component_id, r = record_details, i_m = index_memo, n = next
1112
records = [(key, i_m) for key, (r, i_m, n)
1113
in position_map.iteritems()]
1115
for key, record, digest in \
1116
self._read_records_iter(records):
1117
(record_details, index_memo, next) = position_map[key]
1118
record_map[key] = record, record_details, digest, next
1121
# This retries the whole request if anything fails. Potentially we
1122
# could be a bit more selective. We could track the keys whose records
1123
# we have successfully found, and then only request the new records
1124
# from there. However, _get_components_positions grabs the whole build
1125
# chain, which means we'll likely try to grab the same records again
1126
# anyway. Also, can the build chains change as part of a pack
1127
# operation? We wouldn't want to end up with a broken chain.
1130
position_map = self._get_components_positions(keys,
1131
allow_missing=allow_missing)
1132
# key = component_id, r = record_details, i_m = index_memo,
1134
records = [(key, i_m) for key, (r, i_m, n)
1135
in position_map.iteritems()]
1137
for key, record, digest in self._read_records_iter(records):
1138
(record_details, index_memo, next) = position_map[key]
1139
record_map[key] = record, record_details, digest, next
1141
except errors.RetryWithNewPacks, e:
1142
self._access.reload_or_raise(e)
1144
def _split_by_prefix(self, keys):
1145
"""For the given keys, split them up based on their prefix.
1147
To keep memory pressure somewhat under control, split the
1148
requests back into per-file-id requests, otherwise "bzr co"
1149
extracts the full tree into memory before writing it to disk.
1150
This should be revisited if _get_content_maps() can ever cross
1153
:param keys: An iterable of key tuples
1154
:return: A dict of {prefix: [key_list]}
1156
split_by_prefix = {}
1159
split_by_prefix.setdefault('', []).append(key)
1161
split_by_prefix.setdefault(key[0], []).append(key)
1162
return split_by_prefix
1121
1164
def get_record_stream(self, keys, ordering, include_delta_closure):
1122
1165
"""Get a stream of records for keys.
1137
1180
if not self._index.has_graph:
1138
1181
# Cannot topological order when no graph has been stored.
1139
1182
ordering = 'unordered'
1184
remaining_keys = keys
1187
keys = set(remaining_keys)
1188
for content_factory in self._get_remaining_record_stream(keys,
1189
ordering, include_delta_closure):
1190
remaining_keys.discard(content_factory.key)
1191
yield content_factory
1193
except errors.RetryWithNewPacks, e:
1194
self._access.reload_or_raise(e)
1196
def _get_remaining_record_stream(self, keys, ordering,
1197
include_delta_closure):
1198
"""This function is the 'retry' portion for get_record_stream."""
1140
1199
if include_delta_closure:
1141
1200
positions = self._get_components_positions(keys, allow_missing=True)
1203
1266
for key in parent_map:
1204
1267
present_keys.append(key)
1205
1268
source_keys[-1][1].append(key)
1269
# We have been requested to return these records in an order that
1270
# suits us. So we ask the index to give us an optimally sorted
1272
for source, sub_keys in source_keys:
1273
if source is parent_maps[0]:
1274
# Only sort the keys for this VF
1275
self._index._sort_keys_by_io(sub_keys, positions)
1206
1276
absent_keys = keys - set(global_map)
1207
1277
for key in absent_keys:
1208
1278
yield AbsentContentFactory(key)
1213
1283
if include_delta_closure:
1214
1284
# XXX: get_content_maps performs its own index queries; allow state
1215
1285
# to be passed in.
1216
text_map, _ = self._get_content_maps(present_keys,
1217
needed_from_fallback - absent_keys)
1218
for key in present_keys:
1219
yield FulltextContentFactory(key, global_map[key], None,
1220
''.join(text_map[key]))
1286
non_local_keys = needed_from_fallback - absent_keys
1287
prefix_split_keys = self._split_by_prefix(present_keys)
1288
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1289
for prefix, keys in prefix_split_keys.iteritems():
1290
non_local = prefix_split_non_local_keys.get(prefix, [])
1291
non_local = set(non_local)
1292
text_map, _ = self._get_content_maps(keys, non_local)
1294
lines = text_map.pop(key)
1295
yield ChunkedContentFactory(key, global_map[key], None,
1222
1298
for source, keys in source_keys:
1223
1299
if source is parent_maps[0]:
1277
1354
convertibles = set(["knit-annotated-ft-gz"])
1278
1355
if self._max_delta_chain:
1356
delta_types.add("knit-annotated-delta-gz")
1279
1357
convertibles.add("knit-annotated-delta-gz")
1280
1358
# The set of types we can cheaply adapt without needing basis texts.
1281
1359
native_types = set()
1282
1360
if self._max_delta_chain:
1283
1361
native_types.add("knit-%sdelta-gz" % annotated)
1362
delta_types.add("knit-%sdelta-gz" % annotated)
1284
1363
native_types.add("knit-%sft-gz" % annotated)
1285
1364
knit_types = native_types.union(convertibles)
1290
1369
# can't generate annotations from new deltas until their basis parent
1291
1370
# is present anyway, so we get away with not needing an index that
1292
1371
# includes the new keys.
1373
# See <http://launchpad.net/bugs/300177> about ordering of compression
1374
# parents in the records - to be conservative, we insist that all
1375
# parents must be present to avoid expanding to a fulltext.
1293
1377
# key = basis_parent, value = index entry to add
1294
1378
buffered_index_entries = {}
1295
1379
for record in stream:
1296
1380
parents = record.parents
1381
if record.storage_kind in delta_types:
1382
# TODO: eventually the record itself should track
1383
# compression_parent
1384
compression_parent = parents[0]
1386
compression_parent = None
1297
1387
# Raise an error when a record is missing.
1298
1388
if record.storage_kind == 'absent':
1299
1389
raise RevisionNotPresent([record.key], self)
1300
if record.storage_kind in knit_types:
1390
elif ((record.storage_kind in knit_types)
1391
and (compression_parent is None
1392
or not self._fallback_vfs
1393
or self._index.has_key(compression_parent)
1394
or not self.has_key(compression_parent))):
1395
# we can insert the knit record literally if either it has no
1396
# compression parent OR we already have its basis in this kvf
1397
# OR the basis is not present even in the fallbacks. In the
1398
# last case it will either turn up later in the stream and all
1399
# will be well, or it won't turn up at all and we'll raise an
1402
# TODO: self.has_key is somewhat redundant with
1403
# self._index.has_key; we really want something that directly
1404
# asks if it's only present in the fallbacks. -- mbp 20081119
1301
1405
if record.storage_kind not in native_types:
1303
1407
adapter_key = (record.storage_kind, "knit-delta-gz")
1325
1429
index_entry = (record.key, options, access_memo, parents)
1326
1430
buffered = False
1327
1431
if 'fulltext' not in options:
1328
basis_parent = parents[0]
1432
# Not a fulltext, so we need to make sure the compression
1433
# parent will also be present.
1329
1434
# Note that pack backed knits don't need to buffer here
1330
1435
# because they buffer all writes to the transaction level,
1331
1436
# but we don't expose that difference at the index level. If
1332
1437
# the query here has sufficient cost to show up in
1333
1438
# profiling we should do that.
1334
if basis_parent not in self.get_parent_map([basis_parent]):
1440
# They're required to be physically in this
1441
# KnitVersionedFiles, not in a fallback.
1442
if not self._index.has_key(compression_parent):
1335
1443
pending = buffered_index_entries.setdefault(
1444
compression_parent, [])
1337
1445
pending.append(index_entry)
1338
1446
buffered = True
1339
1447
if not buffered:
1340
1448
self._index.add_records([index_entry])
1449
elif record.storage_kind == 'chunked':
1450
self.add_lines(record.key, parents,
1451
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1341
1452
elif record.storage_kind == 'fulltext':
1342
1453
self.add_lines(record.key, parents,
1343
1454
split_lines(record.get_bytes_as('fulltext')))
1456
# Not a fulltext, and not suitable for direct insertion as a
1457
# delta, either because it's not the right format, or this
1458
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1459
# 0) or because it depends on a base only present in the
1345
1461
adapter_key = record.storage_kind, 'fulltext'
1346
1462
adapter = get_adapter(adapter_key)
1347
1463
lines = split_lines(adapter.get_bytes(
1362
1478
del buffered_index_entries[key]
1363
1479
# If there were any deltas which had a missing basis parent, error.
1364
1480
if buffered_index_entries:
1365
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1481
from pprint import pformat
1482
raise errors.BzrCheckError(
1483
"record_stream refers to compression parents not in %r:\n%s"
1484
% (self, pformat(sorted(buffered_index_entries.keys()))))
1368
1486
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1369
1487
"""Iterate over the lines in the versioned files from keys.
1390
1510
pb = progress.DummyProgress()
1391
1511
keys = set(keys)
1392
1512
total = len(keys)
1393
# we don't care about inclusions, the caller cares.
1394
# but we need to setup a list of records to visit.
1395
# we need key, position, length
1397
build_details = self._index.get_build_details(keys)
1398
for key, details in build_details.iteritems():
1400
key_records.append((key, details[0]))
1402
records_iter = enumerate(self._read_records_iter(key_records))
1403
for (key_idx, (key, data, sha_value)) in records_iter:
1404
pb.update('Walking content.', key_idx, total)
1405
compression_parent = build_details[key][1]
1406
if compression_parent is None:
1408
line_iterator = self._factory.get_fulltext_content(data)
1411
line_iterator = self._factory.get_linedelta_content(data)
1412
# XXX: It might be more efficient to yield (key,
1413
# line_iterator) in the future. However for now, this is a simpler
1414
# change to integrate into the rest of the codebase. RBC 20071110
1415
for line in line_iterator:
1516
# we don't care about inclusions, the caller cares.
1517
# but we need to setup a list of records to visit.
1518
# we need key, position, length
1520
build_details = self._index.get_build_details(keys)
1521
for key, details in build_details.iteritems():
1523
key_records.append((key, details[0]))
1524
records_iter = enumerate(self._read_records_iter(key_records))
1525
for (key_idx, (key, data, sha_value)) in records_iter:
1526
pb.update('Walking content.', key_idx, total)
1527
compression_parent = build_details[key][1]
1528
if compression_parent is None:
1530
line_iterator = self._factory.get_fulltext_content(data)
1533
line_iterator = self._factory.get_linedelta_content(data)
1534
# Now that we are yielding the data for this key, remove it
1537
# XXX: It might be more efficient to yield (key,
1538
# line_iterator) in the future. However for now, this is a
1539
# simpler change to integrate into the rest of the
1540
# codebase. RBC 20071110
1541
for line in line_iterator:
1544
except errors.RetryWithNewPacks, e:
1545
self._access.reload_or_raise(e)
1546
# If there are still keys we've not yet found, we look in the fallback
1547
# vfs, and hope to find them there. Note that if the keys are found
1548
# but had no changes or no content, the fallback may not return
1550
if keys and not self._fallback_vfs:
1551
# XXX: strictly the second parameter is meant to be the file id
1552
# but it's not easily accessible here.
1553
raise RevisionNotPresent(keys, repr(self))
1417
1554
for source in self._fallback_vfs:
2006
2143
self._mode = 'r'
2145
def _sort_keys_by_io(self, keys, positions):
2146
"""Figure out an optimal order to read the records for the given keys.
2148
Sort keys, grouped by index and sorted by position.
2150
:param keys: A list of keys whose records we want to read. This will be
2152
:param positions: A dict, such as the one returned by
2153
_get_components_positions()
2156
def get_sort_key(key):
2157
index_memo = positions[key][1]
2158
# Group by prefix and position. index_memo[0] is the key, so it is
2159
# (file_id, revision_id) and we don't want to sort on revision_id,
2160
# index_memo[1] is the position, and index_memo[2] is the size,
2161
# which doesn't matter for the sort
2162
return index_memo[0][:-1], index_memo[1]
2163
return keys.sort(key=get_sort_key)
2008
2165
def _split_key(self, key):
2009
2166
"""Split key into a prefix and suffix."""
2010
2167
return key[:-1], key[-1]
2255
2414
self._check_read()
2256
2415
return [node[1] for node in self._graph_index.iter_all_entries()]
2417
missing_keys = _mod_index._missing_keys_from_parent_map
2258
2419
def _node_to_position(self, node):
2259
2420
"""Convert an index value to position details."""
2260
2421
bits = node[2][1:].split(' ')
2261
2422
return node[0], int(bits[0]), int(bits[1])
2424
def _sort_keys_by_io(self, keys, positions):
2425
"""Figure out an optimal order to read the records for the given keys.
2427
Sort keys, grouped by index and sorted by position.
2429
:param keys: A list of keys whose records we want to read. This will be
2431
:param positions: A dict, such as the one returned by
2432
_get_components_positions()
2435
def get_index_memo(key):
2436
# index_memo is at offset [1]. It is made up of (GraphIndex,
2437
# position, size). GI is an object, which will be unique for each
2438
# pack file. This causes us to group by pack file, then sort by
2439
# position. Size doesn't matter, but it isn't worth breaking up the
2441
return positions[key][1]
2442
return keys.sort(key=get_index_memo)
2264
2445
class _KnitKeyAccess(object):
2265
2446
"""Access to records in .knit files."""
2339
2520
class _DirectPackAccess(object):
2340
2521
"""Access to data in one or more packs with less translation."""
2342
def __init__(self, index_to_packs):
2523
def __init__(self, index_to_packs, reload_func=None):
2343
2524
"""Create a _DirectPackAccess object.
2345
2526
:param index_to_packs: A dict mapping index objects to the transport
2346
2527
and file names for obtaining data.
2528
:param reload_func: A function to call if we determine that the pack
2529
files have moved and we need to reload our caches. See
2530
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2348
2532
self._container_writer = None
2349
2533
self._write_index = None
2350
2534
self._indices = index_to_packs
2535
self._reload_func = reload_func
2352
2537
def add_raw_records(self, key_sizes, raw_data):
2353
2538
"""Add raw knit bytes to a storage area.
2399
2584
if current_index is not None:
2400
2585
request_lists.append((current_index, current_list))
2401
2586
for index, offsets in request_lists:
2402
transport, path = self._indices[index]
2403
reader = pack.make_readv_reader(transport, path, offsets)
2404
for names, read_func in reader.iter_records():
2405
yield read_func(None)
2588
transport, path = self._indices[index]
2590
# A KeyError here indicates that someone has triggered an index
2591
# reload, and this index has gone missing, we need to start
2593
if self._reload_func is None:
2594
# If we don't have a _reload_func there is nothing that can
2597
raise errors.RetryWithNewPacks(reload_occurred=True,
2598
exc_info=sys.exc_info())
2600
reader = pack.make_readv_reader(transport, path, offsets)
2601
for names, read_func in reader.iter_records():
2602
yield read_func(None)
2603
except errors.NoSuchFile:
2604
# A NoSuchFile error indicates that a pack file has gone
2605
# missing on disk, we need to trigger a reload, and start over.
2606
if self._reload_func is None:
2608
raise errors.RetryWithNewPacks(reload_occurred=False,
2609
exc_info=sys.exc_info())
2407
2611
def set_writer(self, writer, index, transport_packname):
2408
2612
"""Set a writer to use for adding data."""
2411
2615
self._container_writer = writer
2412
2616
self._write_index = index
2618
def reload_or_raise(self, retry_exc):
2619
"""Try calling the reload function, or re-raise the original exception.
2621
This should be called after _DirectPackAccess raises a
2622
RetryWithNewPacks exception. This function will handle the common logic
2623
of determining when the error is fatal versus being temporary.
2624
It will also make sure that the original exception is raised, rather
2625
than the RetryWithNewPacks exception.
2627
If this function returns, then the calling function should retry
2628
whatever operation was being performed. Otherwise an exception will
2631
:param retry_exc: A RetryWithNewPacks exception.
2634
if self._reload_func is None:
2636
elif not self._reload_func():
2637
# The reload claimed that nothing changed
2638
if not retry_exc.reload_occurred:
2639
# If there wasn't an earlier reload, then we really were
2640
# expecting to find changes. We didn't find them, so this is a
2644
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2645
raise exc_class, exc_value, exc_traceback
2415
2648
# Deprecated, use PatienceSequenceMatcher instead
2416
2649
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2655
2889
copy_base_content=(not reuse_content))
2656
2890
fulltext = self._add_fulltext_content(rev_id,
2657
2891
fulltext_content)
2658
blocks = KnitContent.get_line_delta_blocks(delta,
2659
parent_fulltext, fulltext)
2892
if compression_parent == parent_ids[0]:
2893
# the compression_parent is the left parent, so we can
2895
blocks = KnitContent.get_line_delta_blocks(delta,
2896
parent_fulltext, fulltext)
2661
2898
fulltext_content = self._knit._factory.parse_fulltext(
2662
2899
record, rev_id)
2663
2900
fulltext = self._add_fulltext_content(rev_id,
2664
2901
fulltext_content)
2666
2902
nodes_to_annotate.extend(
2667
2903
self._add_annotation(rev_id, fulltext, parent_ids,
2668
2904
left_matching_blocks=blocks))
2684
2920
:param key: The key to annotate.
2686
if True or len(self._knit._fallback_vfs) > 0:
2922
if len(self._knit._fallback_vfs) > 0:
2687
2923
# stacked knits can't use the fast path at present.
2688
2924
return self._simple_annotate(key)
2689
records = self._get_build_graph(key)
2690
if key in self._ghosts:
2691
raise errors.RevisionNotPresent(key, self._knit)
2692
self._annotate_records(records)
2693
return self._annotated_lines[key]
2927
records = self._get_build_graph(key)
2928
if key in self._ghosts:
2929
raise errors.RevisionNotPresent(key, self._knit)
2930
self._annotate_records(records)
2931
return self._annotated_lines[key]
2932
except errors.RetryWithNewPacks, e:
2933
self._knit._access.reload_or_raise(e)
2934
# The cached build_details are no longer valid
2935
self._all_build_details.clear()
2695
2937
def _simple_annotate(self, key):
2696
2938
"""Return annotated fulltext, rediffing from the full texts.