/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Ian Clatworthy
  • Date: 2008-12-15 06:18:29 UTC
  • mfrom: (3905 +trunk)
  • mto: (3586.1.23 views-ui)
  • mto: This revision was merged to the branch mainline in revision 4030.
  • Revision ID: ian.clatworthy@canonical.com-20081215061829-c8qwa93g71u9fsh5
merge bzr.dev 3905

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
64
64
from itertools import izip, chain
65
65
import operator
66
66
import os
 
67
import sys
67
68
 
68
69
from bzrlib.lazy_import import lazy_import
69
70
lazy_import(globals(), """
95
96
    KnitHeaderError,
96
97
    RevisionNotPresent,
97
98
    RevisionAlreadyPresent,
 
99
    SHA1KnitCorrupt,
98
100
    )
99
101
from bzrlib.osutils import (
100
102
    contains_whitespace,
108
110
    adapter_registry,
109
111
    ConstantMapper,
110
112
    ContentFactory,
111
 
    FulltextContentFactory,
 
113
    ChunkedContentFactory,
112
114
    VersionedFile,
113
115
    VersionedFiles,
114
116
    )
194
196
            [compression_parent], 'unordered', True).next()
195
197
        if basis_entry.storage_kind == 'absent':
196
198
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
197
 
        basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
 
199
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
200
        basis_lines = osutils.chunks_to_lines(basis_chunks)
198
201
        # Manually apply the delta because we have one annotated content and
199
202
        # one plain.
200
203
        basis_content = PlainKnitContent(basis_lines, compression_parent)
227
230
            [compression_parent], 'unordered', True).next()
228
231
        if basis_entry.storage_kind == 'absent':
229
232
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
230
 
        basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
 
233
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
234
        basis_lines = osutils.chunks_to_lines(basis_chunks)
231
235
        basis_content = PlainKnitContent(basis_lines, compression_parent)
232
236
        # Manually apply the delta because we have one annotated content and
233
237
        # one plain.
274
278
    def get_bytes_as(self, storage_kind):
275
279
        if storage_kind == self.storage_kind:
276
280
            return self._raw_record
277
 
        if storage_kind == 'fulltext' and self._knit is not None:
278
 
            return self._knit.get_text(self.key[0])
279
 
        else:
280
 
            raise errors.UnavailableRepresentation(self.key, storage_kind,
281
 
                self.storage_kind)
 
281
        if self._knit is not None:
 
282
            if storage_kind == 'chunked':
 
283
                return self._knit.get_lines(self.key[0])
 
284
            elif storage_kind == 'fulltext':
 
285
                return self._knit.get_text(self.key[0])
 
286
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
287
            self.storage_kind)
282
288
 
283
289
 
284
290
class KnitContent(object):
706
712
    """
707
713
 
708
714
    def __init__(self, index, data_access, max_delta_chain=200,
709
 
        annotated=False):
 
715
                 annotated=False, reload_func=None):
710
716
        """Create a KnitVersionedFiles with index and data_access.
711
717
 
712
718
        :param index: The index for the knit data.
716
722
            insertion. Set to 0 to prohibit the use of deltas.
717
723
        :param annotated: Set to True to cause annotations to be calculated and
718
724
            stored during insertion.
 
725
        :param reload_func: An function that can be called if we think we need
 
726
            to reload the pack listing and try again. See
 
727
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
719
728
        """
720
729
        self._index = index
721
730
        self._access = data_access
725
734
        else:
726
735
            self._factory = KnitPlainFactory()
727
736
        self._fallback_vfs = []
 
737
        self._reload_func = reload_func
 
738
 
 
739
    def __repr__(self):
 
740
        return "%s(%r, %r)" % (
 
741
            self.__class__.__name__,
 
742
            self._index,
 
743
            self._access)
728
744
 
729
745
    def add_fallback_versioned_files(self, a_versioned_files):
730
746
        """Add a source of texts for texts not present in this knit.
763
779
        present_parents = []
764
780
        if parent_texts is None:
765
781
            parent_texts = {}
766
 
        # Do a single query to ascertain parent presence.
767
 
        present_parent_map = self.get_parent_map(parents)
 
782
        # Do a single query to ascertain parent presence; we only compress
 
783
        # against parents in the same kvf.
 
784
        present_parent_map = self._index.get_parent_map(parents)
768
785
        for parent in parents:
769
786
            if parent in present_parent_map:
770
787
                present_parents.append(parent)
1007
1024
                if record.storage_kind == 'absent':
1008
1025
                    continue
1009
1026
                missing_keys.remove(record.key)
1010
 
                lines = split_lines(record.get_bytes_as('fulltext'))
 
1027
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1011
1028
                text_map[record.key] = lines
1012
1029
                content_map[record.key] = PlainKnitContent(lines, record.key)
1013
1030
                if record.key in keys:
1048
1065
            text = content.text()
1049
1066
            actual_sha = sha_strings(text)
1050
1067
            if actual_sha != digest:
1051
 
                raise KnitCorrupt(self,
1052
 
                    '\n  sha-1 %s'
1053
 
                    '\n  of reconstructed text does not match'
1054
 
                    '\n  expected %s'
1055
 
                    '\n  for version %s' %
1056
 
                    (actual_sha, digest, key))
 
1068
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1057
1069
            text_map[key] = text
1058
1070
        return text_map, final_content
1059
1071
 
1106
1118
        :param allow_missing: If some records are missing, rather than 
1107
1119
            error, just return the data that could be generated.
1108
1120
        """
1109
 
        position_map = self._get_components_positions(keys,
1110
 
            allow_missing=allow_missing)
1111
 
        # key = component_id, r = record_details, i_m = index_memo, n = next
1112
 
        records = [(key, i_m) for key, (r, i_m, n)
1113
 
                             in position_map.iteritems()]
1114
 
        record_map = {}
1115
 
        for key, record, digest in \
1116
 
                self._read_records_iter(records):
1117
 
            (record_details, index_memo, next) = position_map[key]
1118
 
            record_map[key] = record, record_details, digest, next
1119
 
        return record_map
 
1121
        # This retries the whole request if anything fails. Potentially we
 
1122
        # could be a bit more selective. We could track the keys whose records
 
1123
        # we have successfully found, and then only request the new records
 
1124
        # from there. However, _get_components_positions grabs the whole build
 
1125
        # chain, which means we'll likely try to grab the same records again
 
1126
        # anyway. Also, can the build chains change as part of a pack
 
1127
        # operation? We wouldn't want to end up with a broken chain.
 
1128
        while True:
 
1129
            try:
 
1130
                position_map = self._get_components_positions(keys,
 
1131
                    allow_missing=allow_missing)
 
1132
                # key = component_id, r = record_details, i_m = index_memo,
 
1133
                # n = next
 
1134
                records = [(key, i_m) for key, (r, i_m, n)
 
1135
                                       in position_map.iteritems()]
 
1136
                record_map = {}
 
1137
                for key, record, digest in self._read_records_iter(records):
 
1138
                    (record_details, index_memo, next) = position_map[key]
 
1139
                    record_map[key] = record, record_details, digest, next
 
1140
                return record_map
 
1141
            except errors.RetryWithNewPacks, e:
 
1142
                self._access.reload_or_raise(e)
 
1143
 
 
1144
    def _split_by_prefix(self, keys):
 
1145
        """For the given keys, split them up based on their prefix.
 
1146
 
 
1147
        To keep memory pressure somewhat under control, split the
 
1148
        requests back into per-file-id requests, otherwise "bzr co"
 
1149
        extracts the full tree into memory before writing it to disk.
 
1150
        This should be revisited if _get_content_maps() can ever cross
 
1151
        file-id boundaries.
 
1152
 
 
1153
        :param keys: An iterable of key tuples
 
1154
        :return: A dict of {prefix: [key_list]}
 
1155
        """
 
1156
        split_by_prefix = {}
 
1157
        for key in keys:
 
1158
            if len(key) == 1:
 
1159
                split_by_prefix.setdefault('', []).append(key)
 
1160
            else:
 
1161
                split_by_prefix.setdefault(key[0], []).append(key)
 
1162
        return split_by_prefix
1120
1163
 
1121
1164
    def get_record_stream(self, keys, ordering, include_delta_closure):
1122
1165
        """Get a stream of records for keys.
1137
1180
        if not self._index.has_graph:
1138
1181
            # Cannot topological order when no graph has been stored.
1139
1182
            ordering = 'unordered'
 
1183
 
 
1184
        remaining_keys = keys
 
1185
        while True:
 
1186
            try:
 
1187
                keys = set(remaining_keys)
 
1188
                for content_factory in self._get_remaining_record_stream(keys,
 
1189
                                            ordering, include_delta_closure):
 
1190
                    remaining_keys.discard(content_factory.key)
 
1191
                    yield content_factory
 
1192
                return
 
1193
            except errors.RetryWithNewPacks, e:
 
1194
                self._access.reload_or_raise(e)
 
1195
 
 
1196
    def _get_remaining_record_stream(self, keys, ordering,
 
1197
                                     include_delta_closure):
 
1198
        """This function is the 'retry' portion for get_record_stream."""
1140
1199
        if include_delta_closure:
1141
1200
            positions = self._get_components_positions(keys, allow_missing=True)
1142
1201
        else:
1195
1254
                    current_source = key_source
1196
1255
                source_keys[-1][1].append(key)
1197
1256
        else:
 
1257
            if ordering != 'unordered':
 
1258
                raise AssertionError('valid values for ordering are:'
 
1259
                    ' "unordered" or "topological" not: %r'
 
1260
                    % (ordering,))
1198
1261
            # Just group by source; remote sources first.
1199
1262
            present_keys = []
1200
1263
            source_keys = []
1203
1266
                for key in parent_map:
1204
1267
                    present_keys.append(key)
1205
1268
                    source_keys[-1][1].append(key)
 
1269
            # We have been requested to return these records in an order that
 
1270
            # suits us. So we ask the index to give us an optimally sorted
 
1271
            # order.
 
1272
            for source, sub_keys in source_keys:
 
1273
                if source is parent_maps[0]:
 
1274
                    # Only sort the keys for this VF
 
1275
                    self._index._sort_keys_by_io(sub_keys, positions)
1206
1276
        absent_keys = keys - set(global_map)
1207
1277
        for key in absent_keys:
1208
1278
            yield AbsentContentFactory(key)
1213
1283
        if include_delta_closure:
1214
1284
            # XXX: get_content_maps performs its own index queries; allow state
1215
1285
            # to be passed in.
1216
 
            text_map, _ = self._get_content_maps(present_keys,
1217
 
                needed_from_fallback - absent_keys)
1218
 
            for key in present_keys:
1219
 
                yield FulltextContentFactory(key, global_map[key], None,
1220
 
                    ''.join(text_map[key]))
 
1286
            non_local_keys = needed_from_fallback - absent_keys
 
1287
            prefix_split_keys = self._split_by_prefix(present_keys)
 
1288
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
 
1289
            for prefix, keys in prefix_split_keys.iteritems():
 
1290
                non_local = prefix_split_non_local_keys.get(prefix, [])
 
1291
                non_local = set(non_local)
 
1292
                text_map, _ = self._get_content_maps(keys, non_local)
 
1293
                for key in keys:
 
1294
                    lines = text_map.pop(key)
 
1295
                    yield ChunkedContentFactory(key, global_map[key], None,
 
1296
                                                lines)
1221
1297
        else:
1222
1298
            for source, keys in source_keys:
1223
1299
                if source is parent_maps[0]:
1267
1343
                adapter = adapter_factory(self)
1268
1344
                adapters[adapter_key] = adapter
1269
1345
                return adapter
 
1346
        delta_types = set()
1270
1347
        if self._factory.annotated:
1271
1348
            # self is annotated, we need annotated knits to use directly.
1272
1349
            annotated = "annotated-"
1276
1353
            annotated = ""
1277
1354
            convertibles = set(["knit-annotated-ft-gz"])
1278
1355
            if self._max_delta_chain:
 
1356
                delta_types.add("knit-annotated-delta-gz")
1279
1357
                convertibles.add("knit-annotated-delta-gz")
1280
1358
        # The set of types we can cheaply adapt without needing basis texts.
1281
1359
        native_types = set()
1282
1360
        if self._max_delta_chain:
1283
1361
            native_types.add("knit-%sdelta-gz" % annotated)
 
1362
            delta_types.add("knit-%sdelta-gz" % annotated)
1284
1363
        native_types.add("knit-%sft-gz" % annotated)
1285
1364
        knit_types = native_types.union(convertibles)
1286
1365
        adapters = {}
1290
1369
        # can't generate annotations from new deltas until their basis parent
1291
1370
        # is present anyway, so we get away with not needing an index that
1292
1371
        # includes the new keys.
 
1372
        #
 
1373
        # See <http://launchpad.net/bugs/300177> about ordering of compression
 
1374
        # parents in the records - to be conservative, we insist that all
 
1375
        # parents must be present to avoid expanding to a fulltext.
 
1376
        #
1293
1377
        # key = basis_parent, value = index entry to add
1294
1378
        buffered_index_entries = {}
1295
1379
        for record in stream:
1296
1380
            parents = record.parents
 
1381
            if record.storage_kind in delta_types:
 
1382
                # TODO: eventually the record itself should track
 
1383
                #       compression_parent
 
1384
                compression_parent = parents[0]
 
1385
            else:
 
1386
                compression_parent = None
1297
1387
            # Raise an error when a record is missing.
1298
1388
            if record.storage_kind == 'absent':
1299
1389
                raise RevisionNotPresent([record.key], self)
1300
 
            if record.storage_kind in knit_types:
 
1390
            elif ((record.storage_kind in knit_types)
 
1391
                  and (compression_parent is None
 
1392
                       or not self._fallback_vfs
 
1393
                       or self._index.has_key(compression_parent)
 
1394
                       or not self.has_key(compression_parent))):
 
1395
                # we can insert the knit record literally if either it has no
 
1396
                # compression parent OR we already have its basis in this kvf
 
1397
                # OR the basis is not present even in the fallbacks.  In the
 
1398
                # last case it will either turn up later in the stream and all
 
1399
                # will be well, or it won't turn up at all and we'll raise an
 
1400
                # error at the end.
 
1401
                #
 
1402
                # TODO: self.has_key is somewhat redundant with
 
1403
                # self._index.has_key; we really want something that directly
 
1404
                # asks if it's only present in the fallbacks. -- mbp 20081119
1301
1405
                if record.storage_kind not in native_types:
1302
1406
                    try:
1303
1407
                        adapter_key = (record.storage_kind, "knit-delta-gz")
1325
1429
                index_entry = (record.key, options, access_memo, parents)
1326
1430
                buffered = False
1327
1431
                if 'fulltext' not in options:
1328
 
                    basis_parent = parents[0]
 
1432
                    # Not a fulltext, so we need to make sure the compression
 
1433
                    # parent will also be present.
1329
1434
                    # Note that pack backed knits don't need to buffer here
1330
1435
                    # because they buffer all writes to the transaction level,
1331
1436
                    # but we don't expose that difference at the index level. If
1332
1437
                    # the query here has sufficient cost to show up in
1333
1438
                    # profiling we should do that.
1334
 
                    if basis_parent not in self.get_parent_map([basis_parent]):
 
1439
                    #
 
1440
                    # They're required to be physically in this
 
1441
                    # KnitVersionedFiles, not in a fallback.
 
1442
                    if not self._index.has_key(compression_parent):
1335
1443
                        pending = buffered_index_entries.setdefault(
1336
 
                            basis_parent, [])
 
1444
                            compression_parent, [])
1337
1445
                        pending.append(index_entry)
1338
1446
                        buffered = True
1339
1447
                if not buffered:
1340
1448
                    self._index.add_records([index_entry])
 
1449
            elif record.storage_kind == 'chunked':
 
1450
                self.add_lines(record.key, parents,
 
1451
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1341
1452
            elif record.storage_kind == 'fulltext':
1342
1453
                self.add_lines(record.key, parents,
1343
1454
                    split_lines(record.get_bytes_as('fulltext')))
1344
1455
            else:
 
1456
                # Not a fulltext, and not suitable for direct insertion as a
 
1457
                # delta, either because it's not the right format, or this
 
1458
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
 
1459
                # 0) or because it depends on a base only present in the
 
1460
                # fallback kvfs.
1345
1461
                adapter_key = record.storage_kind, 'fulltext'
1346
1462
                adapter = get_adapter(adapter_key)
1347
1463
                lines = split_lines(adapter.get_bytes(
1362
1478
                    del buffered_index_entries[key]
1363
1479
        # If there were any deltas which had a missing basis parent, error.
1364
1480
        if buffered_index_entries:
1365
 
            raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1366
 
                self)
 
1481
            from pprint import pformat
 
1482
            raise errors.BzrCheckError(
 
1483
                "record_stream refers to compression parents not in %r:\n%s"
 
1484
                % (self, pformat(sorted(buffered_index_entries.keys()))))
1367
1485
 
1368
1486
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1369
1487
        """Iterate over the lines in the versioned files from keys.
1380
1498
        is an iterator).
1381
1499
 
1382
1500
        NOTES:
1383
 
         * Lines are normalised by the underlying store: they will all have \n
 
1501
         * Lines are normalised by the underlying store: they will all have \\n
1384
1502
           terminators.
1385
1503
         * Lines are returned in arbitrary order.
 
1504
         * If a requested key did not change any lines (or didn't have any
 
1505
           lines), it may not be mentioned at all in the result.
1386
1506
 
1387
1507
        :return: An iterator over (line, key).
1388
1508
        """
1390
1510
            pb = progress.DummyProgress()
1391
1511
        keys = set(keys)
1392
1512
        total = len(keys)
1393
 
        # we don't care about inclusions, the caller cares.
1394
 
        # but we need to setup a list of records to visit.
1395
 
        # we need key, position, length
1396
 
        key_records = []
1397
 
        build_details = self._index.get_build_details(keys)
1398
 
        for key, details in build_details.iteritems():
1399
 
            if key in keys:
1400
 
                key_records.append((key, details[0]))
1401
 
                keys.remove(key)
1402
 
        records_iter = enumerate(self._read_records_iter(key_records))
1403
 
        for (key_idx, (key, data, sha_value)) in records_iter:
1404
 
            pb.update('Walking content.', key_idx, total)
1405
 
            compression_parent = build_details[key][1]
1406
 
            if compression_parent is None:
1407
 
                # fulltext
1408
 
                line_iterator = self._factory.get_fulltext_content(data)
1409
 
            else:
1410
 
                # Delta 
1411
 
                line_iterator = self._factory.get_linedelta_content(data)
1412
 
            # XXX: It might be more efficient to yield (key,
1413
 
            # line_iterator) in the future. However for now, this is a simpler
1414
 
            # change to integrate into the rest of the codebase. RBC 20071110
1415
 
            for line in line_iterator:
1416
 
                yield line, key
 
1513
        done = False
 
1514
        while not done:
 
1515
            try:
 
1516
                # we don't care about inclusions, the caller cares.
 
1517
                # but we need to setup a list of records to visit.
 
1518
                # we need key, position, length
 
1519
                key_records = []
 
1520
                build_details = self._index.get_build_details(keys)
 
1521
                for key, details in build_details.iteritems():
 
1522
                    if key in keys:
 
1523
                        key_records.append((key, details[0]))
 
1524
                records_iter = enumerate(self._read_records_iter(key_records))
 
1525
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1526
                    pb.update('Walking content.', key_idx, total)
 
1527
                    compression_parent = build_details[key][1]
 
1528
                    if compression_parent is None:
 
1529
                        # fulltext
 
1530
                        line_iterator = self._factory.get_fulltext_content(data)
 
1531
                    else:
 
1532
                        # Delta 
 
1533
                        line_iterator = self._factory.get_linedelta_content(data)
 
1534
                    # Now that we are yielding the data for this key, remove it
 
1535
                    # from the list
 
1536
                    keys.remove(key)
 
1537
                    # XXX: It might be more efficient to yield (key,
 
1538
                    # line_iterator) in the future. However for now, this is a
 
1539
                    # simpler change to integrate into the rest of the
 
1540
                    # codebase. RBC 20071110
 
1541
                    for line in line_iterator:
 
1542
                        yield line, key
 
1543
                done = True
 
1544
            except errors.RetryWithNewPacks, e:
 
1545
                self._access.reload_or_raise(e)
 
1546
        # If there are still keys we've not yet found, we look in the fallback
 
1547
        # vfs, and hope to find them there.  Note that if the keys are found
 
1548
        # but had no changes or no content, the fallback may not return
 
1549
        # anything.  
 
1550
        if keys and not self._fallback_vfs:
 
1551
            # XXX: strictly the second parameter is meant to be the file id
 
1552
            # but it's not easily accessible here.
 
1553
            raise RevisionNotPresent(keys, repr(self))
1417
1554
        for source in self._fallback_vfs:
1418
1555
            if not keys:
1419
1556
                break
1422
1559
                source_keys.add(key)
1423
1560
                yield line, key
1424
1561
            keys.difference_update(source_keys)
1425
 
        if keys:
1426
 
            raise RevisionNotPresent(keys, self.filename)
1427
1562
        pb.update('Walking content.', total, total)
1428
1563
 
1429
1564
    def _make_line_delta(self, delta_seq, new_content):
1632
1767
        return result
1633
1768
 
1634
1769
 
1635
 
 
1636
1770
class _KndxIndex(object):
1637
1771
    """Manages knit index files
1638
1772
 
1821
1955
                extra information about the content which needs to be passed to
1822
1956
                Factory.parse_record
1823
1957
        """
1824
 
        prefixes = self._partition_keys(keys)
1825
1958
        parent_map = self.get_parent_map(keys)
1826
1959
        result = {}
1827
1960
        for key in keys:
1896
2029
        entry = self._kndx_cache[prefix][0][suffix]
1897
2030
        return key, entry[2], entry[3]
1898
2031
 
 
2032
    has_key = _mod_index._has_key_from_parent_map
 
2033
    
1899
2034
    def _init_index(self, path, extra_lines=[]):
1900
2035
        """Initialize an index."""
1901
2036
        sio = StringIO()
1961
2096
                    del self._filename
1962
2097
                    del self._history
1963
2098
 
 
2099
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2100
 
1964
2101
    def _partition_keys(self, keys):
1965
2102
        """Turn keys into a dict of prefix:suffix_list."""
1966
2103
        result = {}
2005
2142
        else:
2006
2143
            self._mode = 'r'
2007
2144
 
 
2145
    def _sort_keys_by_io(self, keys, positions):
 
2146
        """Figure out an optimal order to read the records for the given keys.
 
2147
 
 
2148
        Sort keys, grouped by index and sorted by position.
 
2149
 
 
2150
        :param keys: A list of keys whose records we want to read. This will be
 
2151
            sorted 'in-place'.
 
2152
        :param positions: A dict, such as the one returned by
 
2153
            _get_components_positions()
 
2154
        :return: None
 
2155
        """
 
2156
        def get_sort_key(key):
 
2157
            index_memo = positions[key][1]
 
2158
            # Group by prefix and position. index_memo[0] is the key, so it is
 
2159
            # (file_id, revision_id) and we don't want to sort on revision_id,
 
2160
            # index_memo[1] is the position, and index_memo[2] is the size,
 
2161
            # which doesn't matter for the sort
 
2162
            return index_memo[0][:-1], index_memo[1]
 
2163
        return keys.sort(key=get_sort_key)
 
2164
 
2008
2165
    def _split_key(self, key):
2009
2166
        """Split key into a prefix and suffix."""
2010
2167
        return key[:-1], key[-1]
2247
2404
        node = self._get_node(key)
2248
2405
        return self._node_to_position(node)
2249
2406
 
 
2407
    has_key = _mod_index._has_key_from_parent_map
 
2408
 
2250
2409
    def keys(self):
2251
2410
        """Get all the keys in the collection.
2252
2411
        
2255
2414
        self._check_read()
2256
2415
        return [node[1] for node in self._graph_index.iter_all_entries()]
2257
2416
    
 
2417
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2418
 
2258
2419
    def _node_to_position(self, node):
2259
2420
        """Convert an index value to position details."""
2260
2421
        bits = node[2][1:].split(' ')
2261
2422
        return node[0], int(bits[0]), int(bits[1])
2262
2423
 
 
2424
    def _sort_keys_by_io(self, keys, positions):
 
2425
        """Figure out an optimal order to read the records for the given keys.
 
2426
 
 
2427
        Sort keys, grouped by index and sorted by position.
 
2428
 
 
2429
        :param keys: A list of keys whose records we want to read. This will be
 
2430
            sorted 'in-place'.
 
2431
        :param positions: A dict, such as the one returned by
 
2432
            _get_components_positions()
 
2433
        :return: None
 
2434
        """
 
2435
        def get_index_memo(key):
 
2436
            # index_memo is at offset [1]. It is made up of (GraphIndex,
 
2437
            # position, size). GI is an object, which will be unique for each
 
2438
            # pack file. This causes us to group by pack file, then sort by
 
2439
            # position. Size doesn't matter, but it isn't worth breaking up the
 
2440
            # tuple.
 
2441
            return positions[key][1]
 
2442
        return keys.sort(key=get_index_memo)
 
2443
 
2263
2444
 
2264
2445
class _KnitKeyAccess(object):
2265
2446
    """Access to records in .knit files."""
2339
2520
class _DirectPackAccess(object):
2340
2521
    """Access to data in one or more packs with less translation."""
2341
2522
 
2342
 
    def __init__(self, index_to_packs):
 
2523
    def __init__(self, index_to_packs, reload_func=None):
2343
2524
        """Create a _DirectPackAccess object.
2344
2525
 
2345
2526
        :param index_to_packs: A dict mapping index objects to the transport
2346
2527
            and file names for obtaining data.
 
2528
        :param reload_func: A function to call if we determine that the pack
 
2529
            files have moved and we need to reload our caches. See
 
2530
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2347
2531
        """
2348
2532
        self._container_writer = None
2349
2533
        self._write_index = None
2350
2534
        self._indices = index_to_packs
 
2535
        self._reload_func = reload_func
2351
2536
 
2352
2537
    def add_raw_records(self, key_sizes, raw_data):
2353
2538
        """Add raw knit bytes to a storage area.
2399
2584
        if current_index is not None:
2400
2585
            request_lists.append((current_index, current_list))
2401
2586
        for index, offsets in request_lists:
2402
 
            transport, path = self._indices[index]
2403
 
            reader = pack.make_readv_reader(transport, path, offsets)
2404
 
            for names, read_func in reader.iter_records():
2405
 
                yield read_func(None)
 
2587
            try:
 
2588
                transport, path = self._indices[index]
 
2589
            except KeyError:
 
2590
                # A KeyError here indicates that someone has triggered an index
 
2591
                # reload, and this index has gone missing, we need to start
 
2592
                # over.
 
2593
                if self._reload_func is None:
 
2594
                    # If we don't have a _reload_func there is nothing that can
 
2595
                    # be done
 
2596
                    raise
 
2597
                raise errors.RetryWithNewPacks(reload_occurred=True,
 
2598
                                               exc_info=sys.exc_info())
 
2599
            try:
 
2600
                reader = pack.make_readv_reader(transport, path, offsets)
 
2601
                for names, read_func in reader.iter_records():
 
2602
                    yield read_func(None)
 
2603
            except errors.NoSuchFile:
 
2604
                # A NoSuchFile error indicates that a pack file has gone
 
2605
                # missing on disk, we need to trigger a reload, and start over.
 
2606
                if self._reload_func is None:
 
2607
                    raise
 
2608
                raise errors.RetryWithNewPacks(reload_occurred=False,
 
2609
                                               exc_info=sys.exc_info())
2406
2610
 
2407
2611
    def set_writer(self, writer, index, transport_packname):
2408
2612
        """Set a writer to use for adding data."""
2411
2615
        self._container_writer = writer
2412
2616
        self._write_index = index
2413
2617
 
 
2618
    def reload_or_raise(self, retry_exc):
 
2619
        """Try calling the reload function, or re-raise the original exception.
 
2620
 
 
2621
        This should be called after _DirectPackAccess raises a
 
2622
        RetryWithNewPacks exception. This function will handle the common logic
 
2623
        of determining when the error is fatal versus being temporary.
 
2624
        It will also make sure that the original exception is raised, rather
 
2625
        than the RetryWithNewPacks exception.
 
2626
 
 
2627
        If this function returns, then the calling function should retry
 
2628
        whatever operation was being performed. Otherwise an exception will
 
2629
        be raised.
 
2630
 
 
2631
        :param retry_exc: A RetryWithNewPacks exception.
 
2632
        """
 
2633
        is_error = False
 
2634
        if self._reload_func is None:
 
2635
            is_error = True
 
2636
        elif not self._reload_func():
 
2637
            # The reload claimed that nothing changed
 
2638
            if not retry_exc.reload_occurred:
 
2639
                # If there wasn't an earlier reload, then we really were
 
2640
                # expecting to find changes. We didn't find them, so this is a
 
2641
                # hard error
 
2642
                is_error = True
 
2643
        if is_error:
 
2644
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
2645
            raise exc_class, exc_value, exc_traceback
 
2646
 
2414
2647
 
2415
2648
# Deprecated, use PatienceSequenceMatcher instead
2416
2649
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2629
2862
                (rev_id, parent_ids, record) = nodes_to_annotate.pop()
2630
2863
                (index_memo, compression_parent, parents,
2631
2864
                 record_details) = self._all_build_details[rev_id]
 
2865
                blocks = None
2632
2866
                if compression_parent is not None:
2633
2867
                    comp_children = self._compression_children[compression_parent]
2634
2868
                    if rev_id not in comp_children:
2655
2889
                        copy_base_content=(not reuse_content))
2656
2890
                    fulltext = self._add_fulltext_content(rev_id,
2657
2891
                                                          fulltext_content)
2658
 
                    blocks = KnitContent.get_line_delta_blocks(delta,
2659
 
                            parent_fulltext, fulltext)
 
2892
                    if compression_parent == parent_ids[0]:
 
2893
                        # the compression_parent is the left parent, so we can
 
2894
                        # re-use the delta
 
2895
                        blocks = KnitContent.get_line_delta_blocks(delta,
 
2896
                                parent_fulltext, fulltext)
2660
2897
                else:
2661
2898
                    fulltext_content = self._knit._factory.parse_fulltext(
2662
2899
                        record, rev_id)
2663
2900
                    fulltext = self._add_fulltext_content(rev_id,
2664
2901
                        fulltext_content)
2665
 
                    blocks = None
2666
2902
                nodes_to_annotate.extend(
2667
2903
                    self._add_annotation(rev_id, fulltext, parent_ids,
2668
2904
                                     left_matching_blocks=blocks))
2683
2919
 
2684
2920
        :param key: The key to annotate.
2685
2921
        """
2686
 
        if True or len(self._knit._fallback_vfs) > 0:
 
2922
        if len(self._knit._fallback_vfs) > 0:
2687
2923
            # stacked knits can't use the fast path at present.
2688
2924
            return self._simple_annotate(key)
2689
 
        records = self._get_build_graph(key)
2690
 
        if key in self._ghosts:
2691
 
            raise errors.RevisionNotPresent(key, self._knit)
2692
 
        self._annotate_records(records)
2693
 
        return self._annotated_lines[key]
 
2925
        while True:
 
2926
            try:
 
2927
                records = self._get_build_graph(key)
 
2928
                if key in self._ghosts:
 
2929
                    raise errors.RevisionNotPresent(key, self._knit)
 
2930
                self._annotate_records(records)
 
2931
                return self._annotated_lines[key]
 
2932
            except errors.RetryWithNewPacks, e:
 
2933
                self._knit._access.reload_or_raise(e)
 
2934
                # The cached build_details are no longer valid
 
2935
                self._all_build_details.clear()
2694
2936
 
2695
2937
    def _simple_annotate(self, key):
2696
2938
        """Return annotated fulltext, rediffing from the full texts.
2716
2958
        reannotate = annotate.reannotate
2717
2959
        for record in self._knit.get_record_stream(keys, 'topological', True):
2718
2960
            key = record.key
2719
 
            fulltext = split_lines(record.get_bytes_as('fulltext'))
 
2961
            fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2720
2962
            parents = parent_map[key]
2721
2963
            if parents is not None:
2722
2964
                parent_lines = [parent_cache[parent] for parent in parent_map[key]]