/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Aaron Bentley
  • Date: 2007-08-14 23:35:48 UTC
  • mfrom: (2698 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2699.
  • Revision ID: aaron.bentley@utoronto.ca-20070814233548-ctlr8sb1lcufb3ny
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
70
70
import warnings
71
71
 
72
72
import bzrlib
 
73
from bzrlib.lazy_import import lazy_import
 
74
lazy_import(globals(), """
 
75
from bzrlib import (
 
76
    pack,
 
77
    )
 
78
""")
73
79
from bzrlib import (
74
80
    cache_utf8,
75
81
    diff,
340
346
def make_empty_knit(transport, relpath):
341
347
    """Construct a empty knit at the specified location."""
342
348
    k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
343
 
    k._data._open_file()
344
349
 
345
350
 
346
351
class KnitVersionedFile(VersionedFile):
361
366
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
362
367
                 factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
363
368
                 create=False, create_parent_dir=False, delay_create=False,
364
 
                 dir_mode=None, index=None):
 
369
                 dir_mode=None, index=None, access_method=None):
365
370
        """Construct a knit at location specified by relpath.
366
371
        
367
372
        :param create: If not True, only open an existing knit.
395
400
                dir_mode=dir_mode)
396
401
        else:
397
402
            self._index = index
398
 
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
399
 
            access_mode, create=create and not len(self), file_mode=file_mode,
400
 
            create_parent_dir=create_parent_dir, delay_create=delay_create,
401
 
            dir_mode=dir_mode)
 
403
        if access_method is None:
 
404
            _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
 
405
                ((create and not len(self)) and delay_create), create_parent_dir)
 
406
        else:
 
407
            _access = access_method
 
408
        if create and not len(self) and not delay_create:
 
409
            _access.create()
 
410
        self._data = _KnitData(_access)
402
411
 
403
412
    def __repr__(self):
404
413
        return '%s(%s)' % (self.__class__.__name__, 
421
430
        for count in xrange(self._max_delta_chain):
422
431
            parent = delta_parents[0]
423
432
            method = self._index.get_method(parent)
424
 
            pos, size = self._index.get_position(parent)
 
433
            index, pos, size = self._index.get_position(parent)
425
434
            if method == 'fulltext':
426
435
                fulltext_size = size
427
436
                break
485
494
        options.append('line-delta')
486
495
        store_lines = self.factory.lower_line_delta(delta)
487
496
 
488
 
        where, size = self._data.add_record(version_id, digest, store_lines)
489
 
        self._index.add_version(version_id, options, where, size, parents)
 
497
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
498
        self._index.add_version(version_id, options, access_memo, parents)
490
499
 
491
500
    def _add_raw_records(self, records, data):
492
501
        """Add all the records 'records' with data pre-joined in 'data'.
497
506
                     the preceding records sizes.
498
507
        """
499
508
        # write all the data
500
 
        pos = self._data.add_raw_record(data)
 
509
        raw_record_sizes = [record[3] for record in records]
 
510
        positions = self._data.add_raw_records(raw_record_sizes, data)
501
511
        offset = 0
502
512
        index_entries = []
503
 
        for (version_id, options, parents, size) in records:
504
 
            index_entries.append((version_id, options, pos+offset,
505
 
                                  size, parents))
 
513
        for (version_id, options, parents, size), access_memo in zip(
 
514
            records, positions):
 
515
            index_entries.append((version_id, options, access_memo, parents))
506
516
            if self._data._do_cache:
507
517
                self._data._cache[version_id] = data[offset:offset+size]
508
518
            offset += size
546
556
        current_values = self._index._cache[version_id]
547
557
        assert set(current_values[4]).difference(set(new_parents)) == set()
548
558
        self._index.add_version(version_id,
549
 
                                current_values[1], 
550
 
                                current_values[2],
551
 
                                current_values[3],
 
559
                                current_values[1],
 
560
                                (None, current_values[2], current_values[3]),
552
561
                                new_parents)
553
562
 
554
563
    def _extract_blocks(self, version_id, source, target):
569
578
            parent = parents[0]
570
579
        else:
571
580
            parent = None
572
 
        data_pos, data_size = self._index.get_position(version_id)
573
 
        data, sha1 = self._data.read_records(((version_id, data_pos, data_size),))[version_id]
 
581
        index_memo = self._index.get_position(version_id)
 
582
        data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
574
583
        noeol = 'no-eol' in self._index.get_options(version_id)
575
584
        if 'fulltext' == self._index.get_method(version_id):
576
585
            new_content = self.factory.parse_fulltext(data, version_id)
697
706
                    next = None
698
707
                else:
699
708
                    next = self.get_parents(cursor)[0]
700
 
                data_pos, data_size = self._index.get_position(cursor)
701
 
                component_data[cursor] = (method, data_pos, data_size, next)
 
709
                index_memo = self._index.get_position(cursor)
 
710
                component_data[cursor] = (method, index_memo, next)
702
711
                cursor = next
703
712
        return component_data
704
713
       
806
815
            options.append('fulltext')
807
816
            store_lines = self.factory.lower_fulltext(lines)
808
817
 
809
 
        where, size = self._data.add_record(version_id, digest, store_lines)
810
 
        self._index.add_version(version_id, options, where, size, parents)
 
818
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
819
        self._index.add_version(version_id, options, access_memo, parents)
811
820
        return lines
812
821
 
813
822
    def check(self, progress_bar=None):
835
844
        If the method is fulltext, next will be None.
836
845
        """
837
846
        position_map = self._get_components_positions(version_ids)
838
 
        # c = component_id, m = method, p = position, s = size, n = next
839
 
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
 
847
        # c = component_id, m = method, i_m = index_memo, n = next
 
848
        records = [(c, i_m) for c, (m, i_m, n) in position_map.iteritems()]
840
849
        record_map = {}
841
850
        for component_id, content, digest in \
842
851
                self._data.read_records_iter(records):
843
 
            method, position, size, next = position_map[component_id]
 
852
            method, index_memo, next = position_map[component_id]
844
853
            record_map[component_id] = method, content, digest, next
845
854
                          
846
855
        return record_map
938
947
        # get a in-component-order queue:
939
948
        for version_id in self.versions():
940
949
            if version_id in requested_versions:
941
 
                data_pos, length = self._index.get_position(version_id)
942
 
                version_id_records.append((version_id, data_pos, length))
 
950
                index_memo = self._index.get_position(version_id)
 
951
                version_id_records.append((version_id, index_memo))
943
952
 
944
953
        total = len(version_id_records)
945
954
        for version_idx, (version_id, data, sha_value) in \
1081
1090
            raise KnitHeaderError(badline=line,
1082
1091
                              filename=self._transport.abspath(self._filename))
1083
1092
 
1084
 
    def commit(self):
1085
 
        """Commit is a nop."""
1086
 
 
1087
1093
    def __repr__(self):
1088
1094
        return '%s(%s)' % (self.__class__.__name__, self._filename)
1089
1095
 
1276
1282
                result_list.append('.' + version)
1277
1283
        return ' '.join(result_list)
1278
1284
 
1279
 
    def add_version(self, version_id, options, pos, size, parents):
 
1285
    def add_version(self, version_id, options, index_memo, parents):
1280
1286
        """Add a version record to the index."""
1281
 
        self.add_versions(((version_id, options, pos, size, parents),))
 
1287
        self.add_versions(((version_id, options, index_memo, parents),))
1282
1288
 
1283
1289
    def add_versions(self, versions):
1284
1290
        """Add multiple versions to the index.
1291
1297
        orig_cache = self._cache.copy()
1292
1298
 
1293
1299
        try:
1294
 
            for version_id, options, pos, size, parents in versions:
 
1300
            for version_id, options, (index, pos, size), parents in versions:
1295
1301
                line = "\n%s %s %s %s %s :" % (version_id,
1296
1302
                                               ','.join(options),
1297
1303
                                               pos,
1324
1330
        return version_id in self._cache
1325
1331
 
1326
1332
    def get_position(self, version_id):
1327
 
        """Return data position and size of specified version."""
 
1333
        """Return details needed to access the version.
 
1334
        
 
1335
        .kndx indices do not support split-out data, so return None for the 
 
1336
        index field.
 
1337
 
 
1338
        :return: a tuple (None, data position, size) to hand to the access
 
1339
            logic to get the record.
 
1340
        """
1328
1341
        entry = self._cache[version_id]
1329
 
        return entry[2], entry[3]
 
1342
        return None, entry[2], entry[3]
1330
1343
 
1331
1344
    def get_method(self, version_id):
1332
1345
        """Return compression method of specified version."""
1384
1397
            raise KnitCorrupt(self, "Cannot do delta compression without "
1385
1398
                "parent tracking.")
1386
1399
 
1387
 
    def _get_entries(self, version_ids, check_present=False):
1388
 
        """Get the entries for version_ids."""
1389
 
        version_ids = set(version_ids)
 
1400
    def _get_entries(self, keys, check_present=False):
 
1401
        """Get the entries for keys.
 
1402
        
 
1403
        :param keys: An iterable of index keys, - 1-tuples.
 
1404
        """
 
1405
        keys = set(keys)
1390
1406
        found_keys = set()
1391
1407
        if self._parents:
1392
 
            for node in self._graph_index.iter_entries(version_ids):
 
1408
            for node in self._graph_index.iter_entries(keys):
1393
1409
                yield node
1394
 
                found_keys.add(node[0])
 
1410
                found_keys.add(node[1])
1395
1411
        else:
1396
1412
            # adapt parentless index to the rest of the code.
1397
 
            for node in self._graph_index.iter_entries(version_ids):
1398
 
                yield node[0], node[1], ()
1399
 
                found_keys.add(node[0])
 
1413
            for node in self._graph_index.iter_entries(keys):
 
1414
                yield node[0], node[1], node[2], ()
 
1415
                found_keys.add(node[1])
1400
1416
        if check_present:
1401
 
            missing_keys = version_ids.difference(found_keys)
 
1417
            missing_keys = keys.difference(found_keys)
1402
1418
            if missing_keys:
1403
1419
                raise RevisionNotPresent(missing_keys.pop(), self)
1404
1420
 
1405
1421
    def _present_keys(self, version_ids):
1406
1422
        return set([
1407
 
            node[0] for node in self._get_entries(version_ids)])
 
1423
            node[1] for node in self._get_entries(version_ids)])
1408
1424
 
1409
1425
    def _parentless_ancestry(self, versions):
1410
1426
        """Honour the get_ancestry API for parentless knit indices."""
1411
 
        present_keys = self._present_keys(versions)
1412
 
        missing = set(versions).difference(present_keys)
 
1427
        wanted_keys = self._version_ids_to_keys(versions)
 
1428
        present_keys = self._present_keys(wanted_keys)
 
1429
        missing = set(wanted_keys).difference(present_keys)
1413
1430
        if missing:
1414
1431
            raise RevisionNotPresent(missing.pop(), self)
1415
 
        return list(present_keys)
 
1432
        return list(self._keys_to_version_ids(present_keys))
1416
1433
 
1417
1434
    def get_ancestry(self, versions, topo_sorted=True):
1418
1435
        """See VersionedFile.get_ancestry."""
1423
1440
        # get a graph of all the mentioned versions:
1424
1441
        graph = {}
1425
1442
        ghosts = set()
1426
 
        versions = set(versions)
 
1443
        versions = self._version_ids_to_keys(versions)
1427
1444
        pending = set(versions)
1428
1445
        while pending:
1429
1446
            # get all pending nodes
1430
1447
            this_iteration = pending
1431
1448
            new_nodes = self._get_entries(this_iteration)
 
1449
            found = set()
1432
1450
            pending = set()
1433
 
            for (key, value, node_refs) in new_nodes:
 
1451
            for (index, key, value, node_refs) in new_nodes:
1434
1452
                # dont ask for ghosties - otherwise
1435
1453
                # we we can end up looping with pending
1436
1454
                # being entirely ghosted.
1437
1455
                graph[key] = [parent for parent in node_refs[0]
1438
1456
                    if parent not in ghosts]
1439
 
                # queue parents 
1440
 
                pending.update(graph[key])
1441
 
            ghosts.difference_update(graph)
1442
 
            # dont examine known nodes
1443
 
            pending.difference_update(graph)
 
1457
                # queue parents
 
1458
                for parent in graph[key]:
 
1459
                    # dont examine known nodes again
 
1460
                    if parent in graph:
 
1461
                        continue
 
1462
                    pending.add(parent)
 
1463
                found.add(key)
 
1464
            ghosts.update(this_iteration.difference(found))
1444
1465
        if versions.difference(graph):
1445
1466
            raise RevisionNotPresent(versions.difference(graph).pop(), self)
1446
 
        if not topo_sorted:
1447
 
            return graph.keys()
1448
 
        return topo_sort(graph.items())
 
1467
        if topo_sorted:
 
1468
            result_keys = topo_sort(graph.items())
 
1469
        else:
 
1470
            result_keys = graph.iterkeys()
 
1471
        return [key[0] for key in result_keys]
1449
1472
 
1450
1473
    def get_ancestry_with_ghosts(self, versions):
1451
1474
        """See VersionedFile.get_ancestry."""
1455
1478
        # it should be altered to be a index core feature?
1456
1479
        # get a graph of all the mentioned versions:
1457
1480
        graph = {}
1458
 
        versions = set(versions)
 
1481
        versions = self._version_ids_to_keys(versions)
1459
1482
        pending = set(versions)
1460
1483
        while pending:
1461
1484
            # get all pending nodes
1462
1485
            this_iteration = pending
1463
1486
            new_nodes = self._get_entries(this_iteration)
1464
1487
            pending = set()
1465
 
            for (key, value, node_refs) in new_nodes:
 
1488
            for (index, key, value, node_refs) in new_nodes:
1466
1489
                graph[key] = node_refs[0]
1467
1490
                # queue parents 
1468
 
                pending.update(graph[key])
 
1491
                for parent in graph[key]:
 
1492
                    # dont examine known nodes again
 
1493
                    if parent in graph:
 
1494
                        continue
 
1495
                    pending.add(parent)
1469
1496
            missing_versions = this_iteration.difference(graph)
1470
1497
            missing_needed = versions.intersection(missing_versions)
1471
1498
            if missing_needed:
1473
1500
            for missing_version in missing_versions:
1474
1501
                # add a key, no parents
1475
1502
                graph[missing_version] = []
1476
 
            # dont examine known nodes
1477
 
            pending.difference_update(graph)
1478
 
        return topo_sort(graph.items())
 
1503
                pending.discard(missing_version) # don't look for it
 
1504
        result_keys = topo_sort(graph.items())
 
1505
        return [key[0] for key in result_keys]
1479
1506
 
1480
1507
    def get_graph(self):
1481
1508
        """Return a list of the node:parents lists from this knit index."""
1482
1509
        if not self._parents:
1483
1510
            return [(key, ()) for key in self.get_versions()]
1484
 
        return [(key, refs[0]) for (key, value, refs) in 
1485
 
            self._graph_index.iter_all_entries()]
 
1511
        result = []
 
1512
        for index, key, value, refs in self._graph_index.iter_all_entries():
 
1513
            result.append((key[0], tuple([ref[0] for ref in refs[0]])))
 
1514
        return result
1486
1515
 
1487
1516
    def iter_parents(self, version_ids):
1488
1517
        """Iterate through the parents for many version ids.
1494
1523
            the underlying implementation.
1495
1524
        """
1496
1525
        if self._parents:
1497
 
            all_nodes = set(self._get_entries(version_ids))
 
1526
            all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1498
1527
            all_parents = set()
1499
1528
            present_parents = set()
1500
1529
            for node in all_nodes:
1501
 
                all_parents.update(node[2][0])
 
1530
                all_parents.update(node[3][0])
1502
1531
                # any node we are querying must be present
1503
 
                present_parents.add(node[0])
 
1532
                present_parents.add(node[1])
1504
1533
            unknown_parents = all_parents.difference(present_parents)
1505
1534
            present_parents.update(self._present_keys(unknown_parents))
1506
1535
            for node in all_nodes:
1507
1536
                parents = []
1508
 
                for parent in node[2][0]:
 
1537
                for parent in node[3][0]:
1509
1538
                    if parent in present_parents:
1510
 
                        parents.append(parent)
1511
 
                yield node[0], tuple(parents)
 
1539
                        parents.append(parent[0])
 
1540
                yield node[1][0], tuple(parents)
1512
1541
        else:
1513
 
            for node in self._get_entries(version_ids):
1514
 
                yield node[0], ()
 
1542
            for node in self._get_entries(self._version_ids_to_keys(version_ids)):
 
1543
                yield node[1][0], ()
1515
1544
 
1516
1545
    def num_versions(self):
1517
1546
        return len(list(self._graph_index.iter_all_entries()))
1520
1549
 
1521
1550
    def get_versions(self):
1522
1551
        """Get all the versions in the file. not topologically sorted."""
1523
 
        return [node[0] for node in self._graph_index.iter_all_entries()]
 
1552
        return [node[1][0] for node in self._graph_index.iter_all_entries()]
1524
1553
    
1525
1554
    def has_version(self, version_id):
1526
1555
        """True if the version is in the index."""
1527
 
        return len(self._present_keys([version_id])) == 1
 
1556
        return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
 
1557
 
 
1558
    def _keys_to_version_ids(self, keys):
 
1559
        return tuple(key[0] for key in keys)
1528
1560
 
1529
1561
    def get_position(self, version_id):
1530
 
        """Return data position and size of specified version."""
1531
 
        bits = self._get_node(version_id)[1][1:].split(' ')
1532
 
        return int(bits[0]), int(bits[1])
 
1562
        """Return details needed to access the version.
 
1563
        
 
1564
        :return: a tuple (index, data position, size) to hand to the access
 
1565
            logic to get the record.
 
1566
        """
 
1567
        node = self._get_node(version_id)
 
1568
        bits = node[2][1:].split(' ')
 
1569
        return node[0], int(bits[0]), int(bits[1])
1533
1570
 
1534
1571
    def get_method(self, version_id):
1535
1572
        """Return compression method of specified version."""
1536
1573
        if not self._deltas:
1537
1574
            return 'fulltext'
1538
 
        return self._parent_compression(self._get_node(version_id)[2][1])
 
1575
        return self._parent_compression(self._get_node(version_id)[3][1])
1539
1576
 
1540
1577
    def _parent_compression(self, reference_list):
1541
1578
        # use the second reference list to decide if this is delta'd or not.
1545
1582
            return 'fulltext'
1546
1583
 
1547
1584
    def _get_node(self, version_id):
1548
 
        return list(self._get_entries([version_id]))[0]
 
1585
        return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
1549
1586
 
1550
1587
    def get_options(self, version_id):
1551
1588
        """Return a string represention options.
1556
1593
        if not self._deltas:
1557
1594
            options = ['fulltext']
1558
1595
        else:
1559
 
            options = [self._parent_compression(node[2][1])]
1560
 
        if node[1][0] == 'N':
 
1596
            options = [self._parent_compression(node[3][1])]
 
1597
        if node[2][0] == 'N':
1561
1598
            options.append('no-eol')
1562
 
        return ','.join(options)
 
1599
        return options
1563
1600
 
1564
1601
    def get_parents(self, version_id):
1565
1602
        """Return parents of specified version ignoring ghosts."""
1571
1608
 
1572
1609
    def get_parents_with_ghosts(self, version_id):
1573
1610
        """Return parents of specified version with ghosts."""
1574
 
        nodes = list(self._get_entries([version_id], check_present=True))
 
1611
        nodes = list(self._get_entries(self._version_ids_to_keys([version_id]),
 
1612
            check_present=True))
1575
1613
        if not self._parents:
1576
1614
            return ()
1577
 
        return nodes[0][2][0]
 
1615
        return self._keys_to_version_ids(nodes[0][3][0])
1578
1616
 
1579
1617
    def check_versions_present(self, version_ids):
1580
1618
        """Check that all specified versions are present."""
1581
 
        version_ids = set(version_ids)
1582
 
        present = self._present_keys(version_ids)
1583
 
        missing = version_ids.difference(present)
 
1619
        keys = self._version_ids_to_keys(version_ids)
 
1620
        present = self._present_keys(keys)
 
1621
        missing = keys.difference(present)
1584
1622
        if missing:
1585
1623
            raise RevisionNotPresent(missing.pop(), self)
1586
1624
 
1587
 
    def add_version(self, version_id, options, pos, size, parents):
 
1625
    def add_version(self, version_id, options, access_memo, parents):
1588
1626
        """Add a version record to the index."""
1589
 
        return self.add_versions(((version_id, options, pos, size, parents),))
 
1627
        return self.add_versions(((version_id, options, access_memo, parents),))
1590
1628
 
1591
1629
    def add_versions(self, versions):
1592
1630
        """Add multiple versions to the index.
1606
1644
        # check for dups
1607
1645
 
1608
1646
        keys = {}
1609
 
        for (version_id, options, pos, size, parents) in versions:
 
1647
        for (version_id, options, access_memo, parents) in versions:
 
1648
            index, pos, size = access_memo
 
1649
            key = (version_id, )
 
1650
            parents = tuple((parent, ) for parent in parents)
1610
1651
            if 'no-eol' in options:
1611
1652
                value = 'N'
1612
1653
            else:
1618
1659
            if self._parents:
1619
1660
                if self._deltas:
1620
1661
                    if 'line-delta' in options:
1621
 
                        node_refs = (tuple(parents), (parents[0],))
 
1662
                        node_refs = (parents, (parents[0],))
1622
1663
                    else:
1623
 
                        node_refs = (tuple(parents), ())
 
1664
                        node_refs = (parents, ())
1624
1665
                else:
1625
 
                    node_refs = (tuple(parents), )
 
1666
                    node_refs = (parents, )
1626
1667
            else:
1627
1668
                if parents:
1628
1669
                    raise KnitCorrupt(self, "attempt to add node with parents "
1629
1670
                        "in parentless index.")
1630
1671
                node_refs = ()
1631
 
            keys[version_id] = (value, node_refs)
 
1672
            keys[key] = (value, node_refs)
1632
1673
        present_nodes = self._get_entries(keys)
1633
 
        for (key, value, node_refs) in present_nodes:
 
1674
        for (index, key, value, node_refs) in present_nodes:
1634
1675
            if (value, node_refs) != keys[key]:
1635
1676
                raise KnitCorrupt(self, "inconsistent details in add_versions"
1636
1677
                    ": %s %s" % ((value, node_refs), keys[key]))
1644
1685
                result.append((key, value))
1645
1686
        self._add_callback(result)
1646
1687
        
1647
 
 
1648
 
class _KnitData(_KnitComponentFile):
1649
 
    """Contents of the knit data file"""
1650
 
 
1651
 
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
1652
 
                 create_parent_dir=False, delay_create=False,
1653
 
                 dir_mode=None):
1654
 
        _KnitComponentFile.__init__(self, transport, filename, mode,
1655
 
                                    file_mode=file_mode,
1656
 
                                    create_parent_dir=create_parent_dir,
1657
 
                                    dir_mode=dir_mode)
 
1688
    def _version_ids_to_keys(self, version_ids):
 
1689
        return set((version_id, ) for version_id in version_ids)
 
1690
 
 
1691
 
 
1692
class _KnitAccess(object):
 
1693
    """Access to knit records in a .knit file."""
 
1694
 
 
1695
    def __init__(self, transport, filename, _file_mode, _dir_mode,
 
1696
        _need_to_create, _create_parent_dir):
 
1697
        """Create a _KnitAccess for accessing and inserting data.
 
1698
 
 
1699
        :param transport: The transport the .knit is located on.
 
1700
        :param filename: The filename of the .knit.
 
1701
        """
 
1702
        self._transport = transport
 
1703
        self._filename = filename
 
1704
        self._file_mode = _file_mode
 
1705
        self._dir_mode = _dir_mode
 
1706
        self._need_to_create = _need_to_create
 
1707
        self._create_parent_dir = _create_parent_dir
 
1708
 
 
1709
    def add_raw_records(self, sizes, raw_data):
 
1710
        """Add raw knit bytes to a storage area.
 
1711
 
 
1712
        The data is spooled to whereever the access method is storing data.
 
1713
 
 
1714
        :param sizes: An iterable containing the size of each raw data segment.
 
1715
        :param raw_data: A bytestring containing the data.
 
1716
        :return: A list of memos to retrieve the record later. Each memo is a
 
1717
            tuple - (index, pos, length), where the index field is always None
 
1718
            for the .knit access method.
 
1719
        """
 
1720
        assert type(raw_data) == str, \
 
1721
            'data must be plain bytes was %s' % type(raw_data)
 
1722
        if not self._need_to_create:
 
1723
            base = self._transport.append_bytes(self._filename, raw_data)
 
1724
        else:
 
1725
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
 
1726
                                   create_parent_dir=self._create_parent_dir,
 
1727
                                   mode=self._file_mode,
 
1728
                                   dir_mode=self._dir_mode)
 
1729
            self._need_to_create = False
 
1730
            base = 0
 
1731
        result = []
 
1732
        for size in sizes:
 
1733
            result.append((None, base, size))
 
1734
            base += size
 
1735
        return result
 
1736
 
 
1737
    def create(self):
 
1738
        """IFF this data access has its own storage area, initialise it.
 
1739
 
 
1740
        :return: None.
 
1741
        """
 
1742
        self._transport.put_bytes_non_atomic(self._filename, '',
 
1743
                                             mode=self._file_mode)
 
1744
 
 
1745
    def open_file(self):
 
1746
        """IFF this data access can be represented as a single file, open it.
 
1747
 
 
1748
        For knits that are not mapped to a single file on disk this will
 
1749
        always return None.
 
1750
 
 
1751
        :return: None or a file handle.
 
1752
        """
 
1753
        try:
 
1754
            return self._transport.get(self._filename)
 
1755
        except NoSuchFile:
 
1756
            pass
 
1757
        return None
 
1758
 
 
1759
    def get_raw_records(self, memos_for_retrieval):
 
1760
        """Get the raw bytes for a records.
 
1761
 
 
1762
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1763
            length) memo for retrieving the bytes. The .knit method ignores
 
1764
            the index as there is always only a single file.
 
1765
        :return: An iterator over the bytes of the records.
 
1766
        """
 
1767
        read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
 
1768
        for pos, data in self._transport.readv(self._filename, read_vector):
 
1769
            yield data
 
1770
 
 
1771
 
 
1772
class _PackAccess(object):
 
1773
    """Access to knit records via a collection of packs."""
 
1774
 
 
1775
    def __init__(self, index_to_packs, writer=None):
 
1776
        """Create a _PackAccess object.
 
1777
 
 
1778
        :param index_to_packs: A dict mapping index objects to the transport
 
1779
            and file names for obtaining data.
 
1780
        :param writer: A tuple (pack.ContainerWriter, write_index) which
 
1781
            contains the pack to write, and the index that reads from it will
 
1782
            be associated with.
 
1783
        """
 
1784
        if writer:
 
1785
            self.container_writer = writer[0]
 
1786
            self.write_index = writer[1]
 
1787
        else:
 
1788
            self.container_writer = None
 
1789
            self.write_index = None
 
1790
        self.indices = index_to_packs
 
1791
 
 
1792
    def add_raw_records(self, sizes, raw_data):
 
1793
        """Add raw knit bytes to a storage area.
 
1794
 
 
1795
        The data is spooled to the container writer in one bytes-record per
 
1796
        raw data item.
 
1797
 
 
1798
        :param sizes: An iterable containing the size of each raw data segment.
 
1799
        :param raw_data: A bytestring containing the data.
 
1800
        :return: A list of memos to retrieve the record later. Each memo is a
 
1801
            tuple - (index, pos, length), where the index field is the 
 
1802
            write_index object supplied to the PackAccess object.
 
1803
        """
 
1804
        assert type(raw_data) == str, \
 
1805
            'data must be plain bytes was %s' % type(raw_data)
 
1806
        result = []
 
1807
        offset = 0
 
1808
        for size in sizes:
 
1809
            p_offset, p_length = self.container_writer.add_bytes_record(
 
1810
                raw_data[offset:offset+size], [])
 
1811
            offset += size
 
1812
            result.append((self.write_index, p_offset, p_length))
 
1813
        return result
 
1814
 
 
1815
    def create(self):
 
1816
        """Pack based knits do not get individually created."""
 
1817
 
 
1818
    def get_raw_records(self, memos_for_retrieval):
 
1819
        """Get the raw bytes for a records.
 
1820
 
 
1821
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1822
            length) memo for retrieving the bytes. The Pack access method
 
1823
            looks up the pack to use for a given record in its index_to_pack
 
1824
            map.
 
1825
        :return: An iterator over the bytes of the records.
 
1826
        """
 
1827
        # first pass, group into same-index requests
 
1828
        request_lists = []
 
1829
        current_index = None
 
1830
        for (index, offset, length) in memos_for_retrieval:
 
1831
            if current_index == index:
 
1832
                current_list.append((offset, length))
 
1833
            else:
 
1834
                if current_index is not None:
 
1835
                    request_lists.append((current_index, current_list))
 
1836
                current_index = index
 
1837
                current_list = [(offset, length)]
 
1838
        # handle the last entry
 
1839
        if current_index is not None:
 
1840
            request_lists.append((current_index, current_list))
 
1841
        for index, offsets in request_lists:
 
1842
            transport, path = self.indices[index]
 
1843
            reader = pack.make_readv_reader(transport, path, offsets)
 
1844
            for names, read_func in reader.iter_records():
 
1845
                yield read_func(None)
 
1846
 
 
1847
    def open_file(self):
 
1848
        """Pack based knits have no single file."""
 
1849
        return None
 
1850
 
 
1851
    def set_writer(self, writer, index, (transport, packname)):
 
1852
        """Set a writer to use for adding data."""
 
1853
        self.indices[index] = (transport, packname)
 
1854
        self.container_writer = writer
 
1855
        self.write_index = index
 
1856
 
 
1857
 
 
1858
class _KnitData(object):
 
1859
    """Manage extraction of data from a KnitAccess, caching and decompressing.
 
1860
    
 
1861
    The KnitData class provides the logic for parsing and using knit records,
 
1862
    making use of an access method for the low level read and write operations.
 
1863
    """
 
1864
 
 
1865
    def __init__(self, access):
 
1866
        """Create a KnitData object.
 
1867
 
 
1868
        :param access: The access method to use. Access methods such as
 
1869
            _KnitAccess manage the insertion of raw records and the subsequent
 
1870
            retrieval of the same.
 
1871
        """
 
1872
        self._access = access
1658
1873
        self._checked = False
1659
1874
        # TODO: jam 20060713 conceptually, this could spill to disk
1660
1875
        #       if the cached size gets larger than a certain amount
1662
1877
        #       a simple dictionary
1663
1878
        self._cache = {}
1664
1879
        self._do_cache = False
1665
 
        if create:
1666
 
            if delay_create:
1667
 
                self._need_to_create = create
1668
 
            else:
1669
 
                self._transport.put_bytes_non_atomic(self._filename, '',
1670
 
                                                     mode=self._file_mode)
1671
1880
 
1672
1881
    def enable_cache(self):
1673
1882
        """Enable caching of reads."""
1679
1888
        self._cache = {}
1680
1889
 
1681
1890
    def _open_file(self):
1682
 
        try:
1683
 
            return self._transport.get(self._filename)
1684
 
        except NoSuchFile:
1685
 
            pass
1686
 
        return None
 
1891
        return self._access.open_file()
1687
1892
 
1688
1893
    def _record_to_data(self, version_id, digest, lines):
1689
1894
        """Convert version_id, digest, lines into a raw data block.
1706
1911
        sio.seek(0)
1707
1912
        return length, sio
1708
1913
 
1709
 
    def add_raw_record(self, raw_data):
 
1914
    def add_raw_records(self, sizes, raw_data):
1710
1915
        """Append a prepared record to the data file.
1711
1916
        
1712
 
        :return: the offset in the data file raw_data was written.
 
1917
        :param sizes: An iterable containing the size of each raw data segment.
 
1918
        :param raw_data: A bytestring containing the data.
 
1919
        :return: a list of index data for the way the data was stored.
 
1920
            See the access method add_raw_records documentation for more
 
1921
            details.
1713
1922
        """
1714
 
        assert isinstance(raw_data, str), 'data must be plain bytes'
1715
 
        if not self._need_to_create:
1716
 
            return self._transport.append_bytes(self._filename, raw_data)
1717
 
        else:
1718
 
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
1719
 
                                   create_parent_dir=self._create_parent_dir,
1720
 
                                   mode=self._file_mode,
1721
 
                                   dir_mode=self._dir_mode)
1722
 
            self._need_to_create = False
1723
 
            return 0
 
1923
        return self._access.add_raw_records(sizes, raw_data)
1724
1924
        
1725
1925
    def add_record(self, version_id, digest, lines):
1726
 
        """Write new text record to disk.  Returns the position in the
1727
 
        file where it was written."""
 
1926
        """Write new text record to disk. 
 
1927
        
 
1928
        Returns index data for retrieving it later, as per add_raw_records.
 
1929
        """
1728
1930
        size, sio = self._record_to_data(version_id, digest, lines)
1729
 
        # write to disk
1730
 
        if not self._need_to_create:
1731
 
            start_pos = self._transport.append_file(self._filename, sio)
1732
 
        else:
1733
 
            self._transport.put_file_non_atomic(self._filename, sio,
1734
 
                               create_parent_dir=self._create_parent_dir,
1735
 
                               mode=self._file_mode,
1736
 
                               dir_mode=self._dir_mode)
1737
 
            self._need_to_create = False
1738
 
            start_pos = 0
 
1931
        result = self.add_raw_records([size], sio.getvalue())
1739
1932
        if self._do_cache:
1740
1933
            self._cache[version_id] = sio.getvalue()
1741
 
        return start_pos, size
 
1934
        return result[0]
1742
1935
 
1743
1936
    def _parse_record_header(self, version_id, raw_data):
1744
1937
        """Parse a record header for consistency.
1750
1943
        try:
1751
1944
            rec = self._check_header(version_id, df.readline())
1752
1945
        except Exception, e:
1753
 
            raise KnitCorrupt(self._filename,
 
1946
            raise KnitCorrupt(self._access,
1754
1947
                              "While reading {%s} got %s(%s)"
1755
1948
                              % (version_id, e.__class__.__name__, str(e)))
1756
1949
        return df, rec
1758
1951
    def _check_header(self, version_id, line):
1759
1952
        rec = line.split()
1760
1953
        if len(rec) != 4:
1761
 
            raise KnitCorrupt(self._filename,
 
1954
            raise KnitCorrupt(self._access,
1762
1955
                              'unexpected number of elements in record header')
1763
1956
        if rec[1] != version_id:
1764
 
            raise KnitCorrupt(self._filename,
 
1957
            raise KnitCorrupt(self._access,
1765
1958
                              'unexpected version, wanted %r, got %r'
1766
1959
                              % (version_id, rec[1]))
1767
1960
        return rec
1776
1969
        try:
1777
1970
            record_contents = df.readlines()
1778
1971
        except Exception, e:
1779
 
            raise KnitCorrupt(self._filename,
 
1972
            raise KnitCorrupt(self._access,
1780
1973
                              "While reading {%s} got %s(%s)"
1781
1974
                              % (version_id, e.__class__.__name__, str(e)))
1782
1975
        header = record_contents.pop(0)
1784
1977
 
1785
1978
        last_line = record_contents.pop()
1786
1979
        if len(record_contents) != int(rec[2]):
1787
 
            raise KnitCorrupt(self._filename,
 
1980
            raise KnitCorrupt(self._access,
1788
1981
                              'incorrect number of lines %s != %s'
1789
1982
                              ' for version {%s}'
1790
1983
                              % (len(record_contents), int(rec[2]),
1791
1984
                                 version_id))
1792
1985
        if last_line != 'end %s\n' % rec[1]:
1793
 
            raise KnitCorrupt(self._filename,
 
1986
            raise KnitCorrupt(self._access,
1794
1987
                              'unexpected version end line %r, wanted %r' 
1795
1988
                              % (last_line, version_id))
1796
1989
        df.close()
1808
2001
            # grab the disk data needed.
1809
2002
            if self._cache:
1810
2003
                # Don't check _cache if it is empty
1811
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
2004
                needed_offsets = [index_memo for version_id, index_memo
1812
2005
                                              in records
1813
2006
                                              if version_id not in self._cache]
1814
2007
            else:
1815
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
2008
                needed_offsets = [index_memo for version_id, index_memo
1816
2009
                                               in records]
1817
2010
 
1818
 
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
2011
            raw_records = self._access.get_raw_records(needed_offsets)
1819
2012
 
1820
 
        for version_id, pos, size in records:
 
2013
        for version_id, index_memo in records:
1821
2014
            if version_id in self._cache:
1822
2015
                # This data has already been validated
1823
2016
                data = self._cache[version_id]
1824
2017
            else:
1825
 
                pos, data = raw_records.next()
 
2018
                data = raw_records.next()
1826
2019
                if self._do_cache:
1827
2020
                    self._cache[version_id] = data
1828
2021
 
1867
2060
 
1868
2061
        # The transport optimizes the fetching as well 
1869
2062
        # (ie, reads continuous ranges.)
1870
 
        readv_response = self._transport.readv(self._filename,
1871
 
            [(pos, size) for version_id, pos, size in needed_records])
 
2063
        raw_data = self._access.get_raw_records(
 
2064
            [index_memo for version_id, index_memo in needed_records])
1872
2065
 
1873
 
        for (version_id, pos, size), (pos, data) in \
1874
 
                izip(iter(needed_records), readv_response):
 
2066
        for (version_id, index_memo), data in \
 
2067
                izip(iter(needed_records), raw_data):
1875
2068
            content, digest = self._parse_record(version_id, data)
1876
2069
            if self._do_cache:
1877
2070
                self._cache[version_id] = data
1966
2159
                    assert (self.target.has_version(parent) or
1967
2160
                            parent in copy_set or
1968
2161
                            not self.source.has_version(parent))
1969
 
                data_pos, data_size = self.source._index.get_position(version_id)
1970
 
                copy_queue_records.append((version_id, data_pos, data_size))
 
2162
                index_memo = self.source._index.get_position(version_id)
 
2163
                copy_queue_records.append((version_id, index_memo))
1971
2164
                copy_queue.append((version_id, options, parents))
1972
2165
                copy_set.add(version_id)
1973
2166