1109
1114
:param allow_missing: If some records are missing, rather than
1110
1115
error, just return the data that could be generated.
1112
position_map = self._get_components_positions(keys,
1113
allow_missing=allow_missing)
1114
# key = component_id, r = record_details, i_m = index_memo, n = next
1115
records = [(key, i_m) for key, (r, i_m, n)
1116
in position_map.iteritems()]
1118
for key, record, digest in \
1119
self._read_records_iter(records):
1120
(record_details, index_memo, next) = position_map[key]
1121
record_map[key] = record, record_details, digest, next
1117
# This retries the whole request if anything fails. Potentially we
1118
# could be a bit more selective. We could track the keys whose records
1119
# we have successfully found, and then only request the new records
1120
# from there. However, _get_components_positions grabs the whole build
1121
# chain, which means we'll likely try to grab the same records again
1122
# anyway. Also, can the build chains change as part of a pack
1123
# operation? We wouldn't want to end up with a broken chain.
1126
position_map = self._get_components_positions(keys,
1127
allow_missing=allow_missing)
1128
# key = component_id, r = record_details, i_m = index_memo,
1130
records = [(key, i_m) for key, (r, i_m, n)
1131
in position_map.iteritems()]
1133
for key, record, digest in self._read_records_iter(records):
1134
(record_details, index_memo, next) = position_map[key]
1135
record_map[key] = record, record_details, digest, next
1137
except errors.RetryWithNewPacks, e:
1138
self._access.reload_or_raise(e)
1124
1140
def _split_by_prefix(self, keys):
1125
1141
"""For the given keys, split them up based on their prefix.
1160
1176
if not self._index.has_graph:
1161
1177
# Cannot topological order when no graph has been stored.
1162
1178
ordering = 'unordered'
1180
remaining_keys = keys
1183
keys = set(remaining_keys)
1184
for content_factory in self._get_remaining_record_stream(keys,
1185
ordering, include_delta_closure):
1186
remaining_keys.discard(content_factory.key)
1187
yield content_factory
1189
except errors.RetryWithNewPacks, e:
1190
self._access.reload_or_raise(e)
1192
def _get_remaining_record_stream(self, keys, ordering,
1193
include_delta_closure):
1194
"""This function is the 'retry' portion for get_record_stream."""
1163
1195
if include_delta_closure:
1164
1196
positions = self._get_components_positions(keys, allow_missing=True)
1401
1433
split_lines(record.get_bytes_as('fulltext')))
1403
1435
# Not a fulltext, and not suitable for direct insertion as a
1404
# delta, either because it's not the right format, or because
1405
# it depends on a base only present in the fallback kvfs.
1436
# delta, either because it's not the right format, or this
1437
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1438
# 0) or because it depends on a base only present in the
1406
1440
adapter_key = record.storage_kind, 'fulltext'
1407
1441
adapter = get_adapter(adapter_key)
1408
1442
lines = split_lines(adapter.get_bytes(
1455
1489
pb = progress.DummyProgress()
1456
1490
keys = set(keys)
1457
1491
total = len(keys)
1458
# we don't care about inclusions, the caller cares.
1459
# but we need to setup a list of records to visit.
1460
# we need key, position, length
1462
build_details = self._index.get_build_details(keys)
1463
for key, details in build_details.iteritems():
1465
key_records.append((key, details[0]))
1467
records_iter = enumerate(self._read_records_iter(key_records))
1468
for (key_idx, (key, data, sha_value)) in records_iter:
1469
pb.update('Walking content.', key_idx, total)
1470
compression_parent = build_details[key][1]
1471
if compression_parent is None:
1473
line_iterator = self._factory.get_fulltext_content(data)
1476
line_iterator = self._factory.get_linedelta_content(data)
1477
# XXX: It might be more efficient to yield (key,
1478
# line_iterator) in the future. However for now, this is a simpler
1479
# change to integrate into the rest of the codebase. RBC 20071110
1480
for line in line_iterator:
1495
# we don't care about inclusions, the caller cares.
1496
# but we need to setup a list of records to visit.
1497
# we need key, position, length
1499
build_details = self._index.get_build_details(keys)
1500
for key, details in build_details.iteritems():
1502
key_records.append((key, details[0]))
1503
records_iter = enumerate(self._read_records_iter(key_records))
1504
for (key_idx, (key, data, sha_value)) in records_iter:
1505
pb.update('Walking content.', key_idx, total)
1506
compression_parent = build_details[key][1]
1507
if compression_parent is None:
1509
line_iterator = self._factory.get_fulltext_content(data)
1512
line_iterator = self._factory.get_linedelta_content(data)
1513
# Now that we are yielding the data for this key, remove it
1516
# XXX: It might be more efficient to yield (key,
1517
# line_iterator) in the future. However for now, this is a
1518
# simpler change to integrate into the rest of the
1519
# codebase. RBC 20071110
1520
for line in line_iterator:
1523
except errors.RetryWithNewPacks, e:
1524
self._access.reload_or_raise(e)
1482
1525
# If there are still keys we've not yet found, we look in the fallback
1483
1526
# vfs, and hope to find them there. Note that if the keys are found
1484
1527
# but had no changes or no content, the fallback may not return
2417
2459
class _DirectPackAccess(object):
2418
2460
"""Access to data in one or more packs with less translation."""
2420
def __init__(self, index_to_packs):
2462
def __init__(self, index_to_packs, reload_func=None):
2421
2463
"""Create a _DirectPackAccess object.
2423
2465
:param index_to_packs: A dict mapping index objects to the transport
2424
2466
and file names for obtaining data.
2467
:param reload_func: A function to call if we determine that the pack
2468
files have moved and we need to reload our caches. See
2469
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2426
2471
self._container_writer = None
2427
2472
self._write_index = None
2428
2473
self._indices = index_to_packs
2474
self._reload_func = reload_func
2430
2476
def add_raw_records(self, key_sizes, raw_data):
2431
2477
"""Add raw knit bytes to a storage area.
2477
2523
if current_index is not None:
2478
2524
request_lists.append((current_index, current_list))
2479
2525
for index, offsets in request_lists:
2480
transport, path = self._indices[index]
2481
reader = pack.make_readv_reader(transport, path, offsets)
2482
for names, read_func in reader.iter_records():
2483
yield read_func(None)
2527
transport, path = self._indices[index]
2529
# A KeyError here indicates that someone has triggered an index
2530
# reload, and this index has gone missing, we need to start
2532
if self._reload_func is None:
2533
# If we don't have a _reload_func there is nothing that can
2536
raise errors.RetryWithNewPacks(reload_occurred=True,
2537
exc_info=sys.exc_info())
2539
reader = pack.make_readv_reader(transport, path, offsets)
2540
for names, read_func in reader.iter_records():
2541
yield read_func(None)
2542
except errors.NoSuchFile:
2543
# A NoSuchFile error indicates that a pack file has gone
2544
# missing on disk, we need to trigger a reload, and start over.
2545
if self._reload_func is None:
2547
raise errors.RetryWithNewPacks(reload_occurred=False,
2548
exc_info=sys.exc_info())
2485
2550
def set_writer(self, writer, index, transport_packname):
2486
2551
"""Set a writer to use for adding data."""
2489
2554
self._container_writer = writer
2490
2555
self._write_index = index
2557
def reload_or_raise(self, retry_exc):
2558
"""Try calling the reload function, or re-raise the original exception.
2560
This should be called after _DirectPackAccess raises a
2561
RetryWithNewPacks exception. This function will handle the common logic
2562
of determining when the error is fatal versus being temporary.
2563
It will also make sure that the original exception is raised, rather
2564
than the RetryWithNewPacks exception.
2566
If this function returns, then the calling function should retry
2567
whatever operation was being performed. Otherwise an exception will
2570
:param retry_exc: A RetryWithNewPacks exception.
2573
if self._reload_func is None:
2575
elif not self._reload_func():
2576
# The reload claimed that nothing changed
2577
if not retry_exc.reload_occurred:
2578
# If there wasn't an earlier reload, then we really were
2579
# expecting to find changes. We didn't find them, so this is a
2583
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2584
raise exc_class, exc_value, exc_traceback
2493
2587
# Deprecated, use PatienceSequenceMatcher instead
2494
2588
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2767
2861
if len(self._knit._fallback_vfs) > 0:
2768
2862
# stacked knits can't use the fast path at present.
2769
2863
return self._simple_annotate(key)
2770
records = self._get_build_graph(key)
2771
if key in self._ghosts:
2772
raise errors.RevisionNotPresent(key, self._knit)
2773
self._annotate_records(records)
2774
return self._annotated_lines[key]
2866
records = self._get_build_graph(key)
2867
if key in self._ghosts:
2868
raise errors.RevisionNotPresent(key, self._knit)
2869
self._annotate_records(records)
2870
return self._annotated_lines[key]
2871
except errors.RetryWithNewPacks, e:
2872
self._knit._access.reload_or_raise(e)
2873
# The cached build_details are no longer valid
2874
self._all_build_details.clear()
2776
2876
def _simple_annotate(self, key):
2777
2877
"""Return annotated fulltext, rediffing from the full texts.