/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Aaron Bentley
  • Date: 2008-12-03 04:23:21 UTC
  • mfrom: (3878 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3892.
  • Revision ID: aaron@aaronbentley.com-20081203042321-kr5k4mdhmdvl3553
Merge with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
64
64
from itertools import izip, chain
65
65
import operator
66
66
import os
 
67
import sys
67
68
 
68
69
from bzrlib.lazy_import import lazy_import
69
70
lazy_import(globals(), """
707
708
    """
708
709
 
709
710
    def __init__(self, index, data_access, max_delta_chain=200,
710
 
        annotated=False):
 
711
                 annotated=False, reload_func=None):
711
712
        """Create a KnitVersionedFiles with index and data_access.
712
713
 
713
714
        :param index: The index for the knit data.
717
718
            insertion. Set to 0 to prohibit the use of deltas.
718
719
        :param annotated: Set to True to cause annotations to be calculated and
719
720
            stored during insertion.
 
721
        :param reload_func: An function that can be called if we think we need
 
722
            to reload the pack listing and try again. See
 
723
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
720
724
        """
721
725
        self._index = index
722
726
        self._access = data_access
726
730
        else:
727
731
            self._factory = KnitPlainFactory()
728
732
        self._fallback_vfs = []
 
733
        self._reload_func = reload_func
729
734
 
730
735
    def __repr__(self):
731
736
        return "%s(%r, %r)" % (
770
775
        present_parents = []
771
776
        if parent_texts is None:
772
777
            parent_texts = {}
773
 
        # Do a single query to ascertain parent presence.
774
 
        present_parent_map = self.get_parent_map(parents)
 
778
        # Do a single query to ascertain parent presence; we only compress
 
779
        # against parents in the same kvf.
 
780
        present_parent_map = self._index.get_parent_map(parents)
775
781
        for parent in parents:
776
782
            if parent in present_parent_map:
777
783
                present_parents.append(parent)
1108
1114
        :param allow_missing: If some records are missing, rather than 
1109
1115
            error, just return the data that could be generated.
1110
1116
        """
1111
 
        position_map = self._get_components_positions(keys,
1112
 
            allow_missing=allow_missing)
1113
 
        # key = component_id, r = record_details, i_m = index_memo, n = next
1114
 
        records = [(key, i_m) for key, (r, i_m, n)
1115
 
                             in position_map.iteritems()]
1116
 
        record_map = {}
1117
 
        for key, record, digest in \
1118
 
                self._read_records_iter(records):
1119
 
            (record_details, index_memo, next) = position_map[key]
1120
 
            record_map[key] = record, record_details, digest, next
1121
 
        return record_map
 
1117
        # This retries the whole request if anything fails. Potentially we
 
1118
        # could be a bit more selective. We could track the keys whose records
 
1119
        # we have successfully found, and then only request the new records
 
1120
        # from there. However, _get_components_positions grabs the whole build
 
1121
        # chain, which means we'll likely try to grab the same records again
 
1122
        # anyway. Also, can the build chains change as part of a pack
 
1123
        # operation? We wouldn't want to end up with a broken chain.
 
1124
        while True:
 
1125
            try:
 
1126
                position_map = self._get_components_positions(keys,
 
1127
                    allow_missing=allow_missing)
 
1128
                # key = component_id, r = record_details, i_m = index_memo,
 
1129
                # n = next
 
1130
                records = [(key, i_m) for key, (r, i_m, n)
 
1131
                                       in position_map.iteritems()]
 
1132
                record_map = {}
 
1133
                for key, record, digest in self._read_records_iter(records):
 
1134
                    (record_details, index_memo, next) = position_map[key]
 
1135
                    record_map[key] = record, record_details, digest, next
 
1136
                return record_map
 
1137
            except errors.RetryWithNewPacks, e:
 
1138
                self._access.reload_or_raise(e)
1122
1139
 
1123
1140
    def _split_by_prefix(self, keys):
1124
1141
        """For the given keys, split them up based on their prefix.
1159
1176
        if not self._index.has_graph:
1160
1177
            # Cannot topological order when no graph has been stored.
1161
1178
            ordering = 'unordered'
 
1179
 
 
1180
        remaining_keys = keys
 
1181
        while True:
 
1182
            try:
 
1183
                keys = set(remaining_keys)
 
1184
                for content_factory in self._get_remaining_record_stream(keys,
 
1185
                                            ordering, include_delta_closure):
 
1186
                    remaining_keys.discard(content_factory.key)
 
1187
                    yield content_factory
 
1188
                return
 
1189
            except errors.RetryWithNewPacks, e:
 
1190
                self._access.reload_or_raise(e)
 
1191
 
 
1192
    def _get_remaining_record_stream(self, keys, ordering,
 
1193
                                     include_delta_closure):
 
1194
        """This function is the 'retry' portion for get_record_stream."""
1162
1195
        if include_delta_closure:
1163
1196
            positions = self._get_components_positions(keys, allow_missing=True)
1164
1197
        else:
1323
1356
        # can't generate annotations from new deltas until their basis parent
1324
1357
        # is present anyway, so we get away with not needing an index that
1325
1358
        # includes the new keys.
 
1359
        #
 
1360
        # See <http://launchpad.net/bugs/300177> about ordering of compression
 
1361
        # parents in the records - to be conservative, we insist that all
 
1362
        # parents must be present to avoid expanding to a fulltext.
 
1363
        #
1326
1364
        # key = basis_parent, value = index entry to add
1327
1365
        buffered_index_entries = {}
1328
1366
        for record in stream:
1330
1368
            # Raise an error when a record is missing.
1331
1369
            if record.storage_kind == 'absent':
1332
1370
                raise RevisionNotPresent([record.key], self)
1333
 
            if record.storage_kind in knit_types:
 
1371
            elif ((record.storage_kind in knit_types)
 
1372
                  and (not parents
 
1373
                       or not self._fallback_vfs
 
1374
                       or not self._index.missing_keys(parents)
 
1375
                       or self.missing_keys(parents))):
 
1376
                # we can insert the knit record literally if either it has no
 
1377
                # compression parent OR we already have its basis in this kvf
 
1378
                # OR the basis is not present even in the fallbacks.  In the
 
1379
                # last case it will either turn up later in the stream and all
 
1380
                # will be well, or it won't turn up at all and we'll raise an
 
1381
                # error at the end.
 
1382
                #
 
1383
                # TODO: self.has_key is somewhat redundant with
 
1384
                # self._index.has_key; we really want something that directly
 
1385
                # asks if it's only present in the fallbacks. -- mbp 20081119
1334
1386
                if record.storage_kind not in native_types:
1335
1387
                    try:
1336
1388
                        adapter_key = (record.storage_kind, "knit-delta-gz")
1358
1410
                index_entry = (record.key, options, access_memo, parents)
1359
1411
                buffered = False
1360
1412
                if 'fulltext' not in options:
1361
 
                    basis_parent = parents[0]
 
1413
                    # Not a fulltext, so we need to make sure the compression
 
1414
                    # parent will also be present.
1362
1415
                    # Note that pack backed knits don't need to buffer here
1363
1416
                    # because they buffer all writes to the transaction level,
1364
1417
                    # but we don't expose that difference at the index level. If
1365
1418
                    # the query here has sufficient cost to show up in
1366
1419
                    # profiling we should do that.
1367
 
                    if basis_parent not in self.get_parent_map([basis_parent]):
 
1420
                    #
 
1421
                    # They're required to be physically in this
 
1422
                    # KnitVersionedFiles, not in a fallback.
 
1423
                    compression_parent = parents[0]
 
1424
                    if self.missing_keys([compression_parent]):
1368
1425
                        pending = buffered_index_entries.setdefault(
1369
 
                            basis_parent, [])
 
1426
                            compression_parent, [])
1370
1427
                        pending.append(index_entry)
1371
1428
                        buffered = True
1372
1429
                if not buffered:
1375
1432
                self.add_lines(record.key, parents,
1376
1433
                    split_lines(record.get_bytes_as('fulltext')))
1377
1434
            else:
 
1435
                # Not a fulltext, and not suitable for direct insertion as a
 
1436
                # delta, either because it's not the right format, or this
 
1437
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
 
1438
                # 0) or because it depends on a base only present in the
 
1439
                # fallback kvfs.
1378
1440
                adapter_key = record.storage_kind, 'fulltext'
1379
1441
                adapter = get_adapter(adapter_key)
1380
1442
                lines = split_lines(adapter.get_bytes(
1395
1457
                    del buffered_index_entries[key]
1396
1458
        # If there were any deltas which had a missing basis parent, error.
1397
1459
        if buffered_index_entries:
1398
 
            raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1399
 
                self)
 
1460
            from pprint import pformat
 
1461
            raise errors.BzrCheckError(
 
1462
                "record_stream refers to compression parents not in %r:\n%s"
 
1463
                % (self, pformat(sorted(buffered_index_entries.keys()))))
1400
1464
 
1401
1465
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1402
1466
        """Iterate over the lines in the versioned files from keys.
1413
1477
        is an iterator).
1414
1478
 
1415
1479
        NOTES:
1416
 
         * Lines are normalised by the underlying store: they will all have \n
 
1480
         * Lines are normalised by the underlying store: they will all have \\n
1417
1481
           terminators.
1418
1482
         * Lines are returned in arbitrary order.
 
1483
         * If a requested key did not change any lines (or didn't have any
 
1484
           lines), it may not be mentioned at all in the result.
1419
1485
 
1420
1486
        :return: An iterator over (line, key).
1421
1487
        """
1423
1489
            pb = progress.DummyProgress()
1424
1490
        keys = set(keys)
1425
1491
        total = len(keys)
1426
 
        # we don't care about inclusions, the caller cares.
1427
 
        # but we need to setup a list of records to visit.
1428
 
        # we need key, position, length
1429
 
        key_records = []
1430
 
        build_details = self._index.get_build_details(keys)
1431
 
        for key, details in build_details.iteritems():
1432
 
            if key in keys:
1433
 
                key_records.append((key, details[0]))
1434
 
                keys.remove(key)
1435
 
        records_iter = enumerate(self._read_records_iter(key_records))
1436
 
        for (key_idx, (key, data, sha_value)) in records_iter:
1437
 
            pb.update('Walking content.', key_idx, total)
1438
 
            compression_parent = build_details[key][1]
1439
 
            if compression_parent is None:
1440
 
                # fulltext
1441
 
                line_iterator = self._factory.get_fulltext_content(data)
1442
 
            else:
1443
 
                # Delta 
1444
 
                line_iterator = self._factory.get_linedelta_content(data)
1445
 
            # XXX: It might be more efficient to yield (key,
1446
 
            # line_iterator) in the future. However for now, this is a simpler
1447
 
            # change to integrate into the rest of the codebase. RBC 20071110
1448
 
            for line in line_iterator:
1449
 
                yield line, key
 
1492
        done = False
 
1493
        while not done:
 
1494
            try:
 
1495
                # we don't care about inclusions, the caller cares.
 
1496
                # but we need to setup a list of records to visit.
 
1497
                # we need key, position, length
 
1498
                key_records = []
 
1499
                build_details = self._index.get_build_details(keys)
 
1500
                for key, details in build_details.iteritems():
 
1501
                    if key in keys:
 
1502
                        key_records.append((key, details[0]))
 
1503
                records_iter = enumerate(self._read_records_iter(key_records))
 
1504
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1505
                    pb.update('Walking content.', key_idx, total)
 
1506
                    compression_parent = build_details[key][1]
 
1507
                    if compression_parent is None:
 
1508
                        # fulltext
 
1509
                        line_iterator = self._factory.get_fulltext_content(data)
 
1510
                    else:
 
1511
                        # Delta 
 
1512
                        line_iterator = self._factory.get_linedelta_content(data)
 
1513
                    # Now that we are yielding the data for this key, remove it
 
1514
                    # from the list
 
1515
                    keys.remove(key)
 
1516
                    # XXX: It might be more efficient to yield (key,
 
1517
                    # line_iterator) in the future. However for now, this is a
 
1518
                    # simpler change to integrate into the rest of the
 
1519
                    # codebase. RBC 20071110
 
1520
                    for line in line_iterator:
 
1521
                        yield line, key
 
1522
                done = True
 
1523
            except errors.RetryWithNewPacks, e:
 
1524
                self._access.reload_or_raise(e)
 
1525
        # If there are still keys we've not yet found, we look in the fallback
 
1526
        # vfs, and hope to find them there.  Note that if the keys are found
 
1527
        # but had no changes or no content, the fallback may not return
 
1528
        # anything.  
 
1529
        if keys and not self._fallback_vfs:
 
1530
            # XXX: strictly the second parameter is meant to be the file id
 
1531
            # but it's not easily accessible here.
 
1532
            raise RevisionNotPresent(keys, repr(self))
1450
1533
        for source in self._fallback_vfs:
1451
1534
            if not keys:
1452
1535
                break
1455
1538
                source_keys.add(key)
1456
1539
                yield line, key
1457
1540
            keys.difference_update(source_keys)
1458
 
        if keys:
1459
 
            # XXX: strictly the second parameter is meant to be the file id
1460
 
            # but it's not easily accessible here.
1461
 
            raise RevisionNotPresent(keys, repr(self))
1462
1541
        pb.update('Walking content.', total, total)
1463
1542
 
1464
1543
    def _make_line_delta(self, delta_seq, new_content):
1855
1934
                extra information about the content which needs to be passed to
1856
1935
                Factory.parse_record
1857
1936
        """
1858
 
        prefixes = self._partition_keys(keys)
1859
1937
        parent_map = self.get_parent_map(keys)
1860
1938
        result = {}
1861
1939
        for key in keys:
1930
2008
        entry = self._kndx_cache[prefix][0][suffix]
1931
2009
        return key, entry[2], entry[3]
1932
2010
 
 
2011
    has_key = _mod_index._has_key_from_parent_map
 
2012
    
1933
2013
    def _init_index(self, path, extra_lines=[]):
1934
2014
        """Initialize an index."""
1935
2015
        sio = StringIO()
1995
2075
                    del self._filename
1996
2076
                    del self._history
1997
2077
 
 
2078
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2079
 
1998
2080
    def _partition_keys(self, keys):
1999
2081
        """Turn keys into a dict of prefix:suffix_list."""
2000
2082
        result = {}
2281
2363
        node = self._get_node(key)
2282
2364
        return self._node_to_position(node)
2283
2365
 
 
2366
    has_key = _mod_index._has_key_from_parent_map
 
2367
 
2284
2368
    def keys(self):
2285
2369
        """Get all the keys in the collection.
2286
2370
        
2289
2373
        self._check_read()
2290
2374
        return [node[1] for node in self._graph_index.iter_all_entries()]
2291
2375
    
 
2376
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2377
 
2292
2378
    def _node_to_position(self, node):
2293
2379
        """Convert an index value to position details."""
2294
2380
        bits = node[2][1:].split(' ')
2373
2459
class _DirectPackAccess(object):
2374
2460
    """Access to data in one or more packs with less translation."""
2375
2461
 
2376
 
    def __init__(self, index_to_packs):
 
2462
    def __init__(self, index_to_packs, reload_func=None):
2377
2463
        """Create a _DirectPackAccess object.
2378
2464
 
2379
2465
        :param index_to_packs: A dict mapping index objects to the transport
2380
2466
            and file names for obtaining data.
 
2467
        :param reload_func: A function to call if we determine that the pack
 
2468
            files have moved and we need to reload our caches. See
 
2469
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2381
2470
        """
2382
2471
        self._container_writer = None
2383
2472
        self._write_index = None
2384
2473
        self._indices = index_to_packs
 
2474
        self._reload_func = reload_func
2385
2475
 
2386
2476
    def add_raw_records(self, key_sizes, raw_data):
2387
2477
        """Add raw knit bytes to a storage area.
2433
2523
        if current_index is not None:
2434
2524
            request_lists.append((current_index, current_list))
2435
2525
        for index, offsets in request_lists:
2436
 
            transport, path = self._indices[index]
2437
 
            reader = pack.make_readv_reader(transport, path, offsets)
2438
 
            for names, read_func in reader.iter_records():
2439
 
                yield read_func(None)
 
2526
            try:
 
2527
                transport, path = self._indices[index]
 
2528
            except KeyError:
 
2529
                # A KeyError here indicates that someone has triggered an index
 
2530
                # reload, and this index has gone missing, we need to start
 
2531
                # over.
 
2532
                if self._reload_func is None:
 
2533
                    # If we don't have a _reload_func there is nothing that can
 
2534
                    # be done
 
2535
                    raise
 
2536
                raise errors.RetryWithNewPacks(reload_occurred=True,
 
2537
                                               exc_info=sys.exc_info())
 
2538
            try:
 
2539
                reader = pack.make_readv_reader(transport, path, offsets)
 
2540
                for names, read_func in reader.iter_records():
 
2541
                    yield read_func(None)
 
2542
            except errors.NoSuchFile:
 
2543
                # A NoSuchFile error indicates that a pack file has gone
 
2544
                # missing on disk, we need to trigger a reload, and start over.
 
2545
                if self._reload_func is None:
 
2546
                    raise
 
2547
                raise errors.RetryWithNewPacks(reload_occurred=False,
 
2548
                                               exc_info=sys.exc_info())
2440
2549
 
2441
2550
    def set_writer(self, writer, index, transport_packname):
2442
2551
        """Set a writer to use for adding data."""
2445
2554
        self._container_writer = writer
2446
2555
        self._write_index = index
2447
2556
 
 
2557
    def reload_or_raise(self, retry_exc):
 
2558
        """Try calling the reload function, or re-raise the original exception.
 
2559
 
 
2560
        This should be called after _DirectPackAccess raises a
 
2561
        RetryWithNewPacks exception. This function will handle the common logic
 
2562
        of determining when the error is fatal versus being temporary.
 
2563
        It will also make sure that the original exception is raised, rather
 
2564
        than the RetryWithNewPacks exception.
 
2565
 
 
2566
        If this function returns, then the calling function should retry
 
2567
        whatever operation was being performed. Otherwise an exception will
 
2568
        be raised.
 
2569
 
 
2570
        :param retry_exc: A RetryWithNewPacks exception.
 
2571
        """
 
2572
        is_error = False
 
2573
        if self._reload_func is None:
 
2574
            is_error = True
 
2575
        elif not self._reload_func():
 
2576
            # The reload claimed that nothing changed
 
2577
            if not retry_exc.reload_occurred:
 
2578
                # If there wasn't an earlier reload, then we really were
 
2579
                # expecting to find changes. We didn't find them, so this is a
 
2580
                # hard error
 
2581
                is_error = True
 
2582
        if is_error:
 
2583
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
2584
            raise exc_class, exc_value, exc_traceback
 
2585
 
2448
2586
 
2449
2587
# Deprecated, use PatienceSequenceMatcher instead
2450
2588
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2723
2861
        if len(self._knit._fallback_vfs) > 0:
2724
2862
            # stacked knits can't use the fast path at present.
2725
2863
            return self._simple_annotate(key)
2726
 
        records = self._get_build_graph(key)
2727
 
        if key in self._ghosts:
2728
 
            raise errors.RevisionNotPresent(key, self._knit)
2729
 
        self._annotate_records(records)
2730
 
        return self._annotated_lines[key]
 
2864
        while True:
 
2865
            try:
 
2866
                records = self._get_build_graph(key)
 
2867
                if key in self._ghosts:
 
2868
                    raise errors.RevisionNotPresent(key, self._knit)
 
2869
                self._annotate_records(records)
 
2870
                return self._annotated_lines[key]
 
2871
            except errors.RetryWithNewPacks, e:
 
2872
                self._knit._access.reload_or_raise(e)
 
2873
                # The cached build_details are no longer valid
 
2874
                self._all_build_details.clear()
2731
2875
 
2732
2876
    def _simple_annotate(self, key):
2733
2877
        """Return annotated fulltext, rediffing from the full texts.