1108
1114
:param allow_missing: If some records are missing, rather than
1109
1115
error, just return the data that could be generated.
1111
position_map = self._get_components_positions(keys,
1112
allow_missing=allow_missing)
1113
# key = component_id, r = record_details, i_m = index_memo, n = next
1114
records = [(key, i_m) for key, (r, i_m, n)
1115
in position_map.iteritems()]
1117
for key, record, digest in \
1118
self._read_records_iter(records):
1119
(record_details, index_memo, next) = position_map[key]
1120
record_map[key] = record, record_details, digest, next
1117
# This retries the whole request if anything fails. Potentially we
1118
# could be a bit more selective. We could track the keys whose records
1119
# we have successfully found, and then only request the new records
1120
# from there. However, _get_components_positions grabs the whole build
1121
# chain, which means we'll likely try to grab the same records again
1122
# anyway. Also, can the build chains change as part of a pack
1123
# operation? We wouldn't want to end up with a broken chain.
1126
position_map = self._get_components_positions(keys,
1127
allow_missing=allow_missing)
1128
# key = component_id, r = record_details, i_m = index_memo,
1130
records = [(key, i_m) for key, (r, i_m, n)
1131
in position_map.iteritems()]
1133
for key, record, digest in self._read_records_iter(records):
1134
(record_details, index_memo, next) = position_map[key]
1135
record_map[key] = record, record_details, digest, next
1137
except errors.RetryWithNewPacks, e:
1138
self._access.reload_or_raise(e)
1123
1140
def _split_by_prefix(self, keys):
1124
1141
"""For the given keys, split them up based on their prefix.
1159
1176
if not self._index.has_graph:
1160
1177
# Cannot topological order when no graph has been stored.
1161
1178
ordering = 'unordered'
1180
remaining_keys = keys
1183
keys = set(remaining_keys)
1184
for content_factory in self._get_remaining_record_stream(keys,
1185
ordering, include_delta_closure):
1186
remaining_keys.discard(content_factory.key)
1187
yield content_factory
1189
except errors.RetryWithNewPacks, e:
1190
self._access.reload_or_raise(e)
1192
def _get_remaining_record_stream(self, keys, ordering,
1193
include_delta_closure):
1194
"""This function is the 'retry' portion for get_record_stream."""
1162
1195
if include_delta_closure:
1163
1196
positions = self._get_components_positions(keys, allow_missing=True)
1323
1356
# can't generate annotations from new deltas until their basis parent
1324
1357
# is present anyway, so we get away with not needing an index that
1325
1358
# includes the new keys.
1360
# See <http://launchpad.net/bugs/300177> about ordering of compression
1361
# parents in the records - to be conservative, we insist that all
1362
# parents must be present to avoid expanding to a fulltext.
1326
1364
# key = basis_parent, value = index entry to add
1327
1365
buffered_index_entries = {}
1328
1366
for record in stream:
1330
1368
# Raise an error when a record is missing.
1331
1369
if record.storage_kind == 'absent':
1332
1370
raise RevisionNotPresent([record.key], self)
1333
if record.storage_kind in knit_types:
1371
elif ((record.storage_kind in knit_types)
1373
or not self._fallback_vfs
1374
or not self._index.missing_keys(parents)
1375
or self.missing_keys(parents))):
1376
# we can insert the knit record literally if either it has no
1377
# compression parent OR we already have its basis in this kvf
1378
# OR the basis is not present even in the fallbacks. In the
1379
# last case it will either turn up later in the stream and all
1380
# will be well, or it won't turn up at all and we'll raise an
1383
# TODO: self.has_key is somewhat redundant with
1384
# self._index.has_key; we really want something that directly
1385
# asks if it's only present in the fallbacks. -- mbp 20081119
1334
1386
if record.storage_kind not in native_types:
1336
1388
adapter_key = (record.storage_kind, "knit-delta-gz")
1358
1410
index_entry = (record.key, options, access_memo, parents)
1359
1411
buffered = False
1360
1412
if 'fulltext' not in options:
1361
basis_parent = parents[0]
1413
# Not a fulltext, so we need to make sure the compression
1414
# parent will also be present.
1362
1415
# Note that pack backed knits don't need to buffer here
1363
1416
# because they buffer all writes to the transaction level,
1364
1417
# but we don't expose that difference at the index level. If
1365
1418
# the query here has sufficient cost to show up in
1366
1419
# profiling we should do that.
1367
if basis_parent not in self.get_parent_map([basis_parent]):
1421
# They're required to be physically in this
1422
# KnitVersionedFiles, not in a fallback.
1423
compression_parent = parents[0]
1424
if self.missing_keys([compression_parent]):
1368
1425
pending = buffered_index_entries.setdefault(
1426
compression_parent, [])
1370
1427
pending.append(index_entry)
1371
1428
buffered = True
1372
1429
if not buffered:
1375
1432
self.add_lines(record.key, parents,
1376
1433
split_lines(record.get_bytes_as('fulltext')))
1435
# Not a fulltext, and not suitable for direct insertion as a
1436
# delta, either because it's not the right format, or this
1437
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1438
# 0) or because it depends on a base only present in the
1378
1440
adapter_key = record.storage_kind, 'fulltext'
1379
1441
adapter = get_adapter(adapter_key)
1380
1442
lines = split_lines(adapter.get_bytes(
1395
1457
del buffered_index_entries[key]
1396
1458
# If there were any deltas which had a missing basis parent, error.
1397
1459
if buffered_index_entries:
1398
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1460
from pprint import pformat
1461
raise errors.BzrCheckError(
1462
"record_stream refers to compression parents not in %r:\n%s"
1463
% (self, pformat(sorted(buffered_index_entries.keys()))))
1401
1465
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1402
1466
"""Iterate over the lines in the versioned files from keys.
1423
1489
pb = progress.DummyProgress()
1424
1490
keys = set(keys)
1425
1491
total = len(keys)
1426
# we don't care about inclusions, the caller cares.
1427
# but we need to setup a list of records to visit.
1428
# we need key, position, length
1430
build_details = self._index.get_build_details(keys)
1431
for key, details in build_details.iteritems():
1433
key_records.append((key, details[0]))
1435
records_iter = enumerate(self._read_records_iter(key_records))
1436
for (key_idx, (key, data, sha_value)) in records_iter:
1437
pb.update('Walking content.', key_idx, total)
1438
compression_parent = build_details[key][1]
1439
if compression_parent is None:
1441
line_iterator = self._factory.get_fulltext_content(data)
1444
line_iterator = self._factory.get_linedelta_content(data)
1445
# XXX: It might be more efficient to yield (key,
1446
# line_iterator) in the future. However for now, this is a simpler
1447
# change to integrate into the rest of the codebase. RBC 20071110
1448
for line in line_iterator:
1495
# we don't care about inclusions, the caller cares.
1496
# but we need to setup a list of records to visit.
1497
# we need key, position, length
1499
build_details = self._index.get_build_details(keys)
1500
for key, details in build_details.iteritems():
1502
key_records.append((key, details[0]))
1503
records_iter = enumerate(self._read_records_iter(key_records))
1504
for (key_idx, (key, data, sha_value)) in records_iter:
1505
pb.update('Walking content.', key_idx, total)
1506
compression_parent = build_details[key][1]
1507
if compression_parent is None:
1509
line_iterator = self._factory.get_fulltext_content(data)
1512
line_iterator = self._factory.get_linedelta_content(data)
1513
# Now that we are yielding the data for this key, remove it
1516
# XXX: It might be more efficient to yield (key,
1517
# line_iterator) in the future. However for now, this is a
1518
# simpler change to integrate into the rest of the
1519
# codebase. RBC 20071110
1520
for line in line_iterator:
1523
except errors.RetryWithNewPacks, e:
1524
self._access.reload_or_raise(e)
1525
# If there are still keys we've not yet found, we look in the fallback
1526
# vfs, and hope to find them there. Note that if the keys are found
1527
# but had no changes or no content, the fallback may not return
1529
if keys and not self._fallback_vfs:
1530
# XXX: strictly the second parameter is meant to be the file id
1531
# but it's not easily accessible here.
1532
raise RevisionNotPresent(keys, repr(self))
1450
1533
for source in self._fallback_vfs:
2373
2459
class _DirectPackAccess(object):
2374
2460
"""Access to data in one or more packs with less translation."""
2376
def __init__(self, index_to_packs):
2462
def __init__(self, index_to_packs, reload_func=None):
2377
2463
"""Create a _DirectPackAccess object.
2379
2465
:param index_to_packs: A dict mapping index objects to the transport
2380
2466
and file names for obtaining data.
2467
:param reload_func: A function to call if we determine that the pack
2468
files have moved and we need to reload our caches. See
2469
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2382
2471
self._container_writer = None
2383
2472
self._write_index = None
2384
2473
self._indices = index_to_packs
2474
self._reload_func = reload_func
2386
2476
def add_raw_records(self, key_sizes, raw_data):
2387
2477
"""Add raw knit bytes to a storage area.
2433
2523
if current_index is not None:
2434
2524
request_lists.append((current_index, current_list))
2435
2525
for index, offsets in request_lists:
2436
transport, path = self._indices[index]
2437
reader = pack.make_readv_reader(transport, path, offsets)
2438
for names, read_func in reader.iter_records():
2439
yield read_func(None)
2527
transport, path = self._indices[index]
2529
# A KeyError here indicates that someone has triggered an index
2530
# reload, and this index has gone missing, we need to start
2532
if self._reload_func is None:
2533
# If we don't have a _reload_func there is nothing that can
2536
raise errors.RetryWithNewPacks(reload_occurred=True,
2537
exc_info=sys.exc_info())
2539
reader = pack.make_readv_reader(transport, path, offsets)
2540
for names, read_func in reader.iter_records():
2541
yield read_func(None)
2542
except errors.NoSuchFile:
2543
# A NoSuchFile error indicates that a pack file has gone
2544
# missing on disk, we need to trigger a reload, and start over.
2545
if self._reload_func is None:
2547
raise errors.RetryWithNewPacks(reload_occurred=False,
2548
exc_info=sys.exc_info())
2441
2550
def set_writer(self, writer, index, transport_packname):
2442
2551
"""Set a writer to use for adding data."""
2445
2554
self._container_writer = writer
2446
2555
self._write_index = index
2557
def reload_or_raise(self, retry_exc):
2558
"""Try calling the reload function, or re-raise the original exception.
2560
This should be called after _DirectPackAccess raises a
2561
RetryWithNewPacks exception. This function will handle the common logic
2562
of determining when the error is fatal versus being temporary.
2563
It will also make sure that the original exception is raised, rather
2564
than the RetryWithNewPacks exception.
2566
If this function returns, then the calling function should retry
2567
whatever operation was being performed. Otherwise an exception will
2570
:param retry_exc: A RetryWithNewPacks exception.
2573
if self._reload_func is None:
2575
elif not self._reload_func():
2576
# The reload claimed that nothing changed
2577
if not retry_exc.reload_occurred:
2578
# If there wasn't an earlier reload, then we really were
2579
# expecting to find changes. We didn't find them, so this is a
2583
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2584
raise exc_class, exc_value, exc_traceback
2449
2587
# Deprecated, use PatienceSequenceMatcher instead
2450
2588
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2723
2861
if len(self._knit._fallback_vfs) > 0:
2724
2862
# stacked knits can't use the fast path at present.
2725
2863
return self._simple_annotate(key)
2726
records = self._get_build_graph(key)
2727
if key in self._ghosts:
2728
raise errors.RevisionNotPresent(key, self._knit)
2729
self._annotate_records(records)
2730
return self._annotated_lines[key]
2866
records = self._get_build_graph(key)
2867
if key in self._ghosts:
2868
raise errors.RevisionNotPresent(key, self._knit)
2869
self._annotate_records(records)
2870
return self._annotated_lines[key]
2871
except errors.RetryWithNewPacks, e:
2872
self._knit._access.reload_or_raise(e)
2873
# The cached build_details are no longer valid
2874
self._all_build_details.clear()
2732
2876
def _simple_annotate(self, key):
2733
2877
"""Return annotated fulltext, rediffing from the full texts.