/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Aaron Bentley
  • Date: 2007-08-15 01:11:27 UTC
  • mfrom: (2699 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2700.
  • Revision ID: aaron.bentley@utoronto.ca-20070815011127-0il5s8oqmt26bma7
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
70
70
import warnings
71
71
 
72
72
import bzrlib
 
73
from bzrlib.lazy_import import lazy_import
 
74
lazy_import(globals(), """
 
75
from bzrlib import (
 
76
    pack,
 
77
    )
 
78
""")
73
79
from bzrlib import (
74
80
    cache_utf8,
 
81
    diff,
75
82
    errors,
76
83
    osutils,
77
84
    patiencediff,
339
346
def make_empty_knit(transport, relpath):
340
347
    """Construct a empty knit at the specified location."""
341
348
    k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
342
 
    k._data._open_file()
343
349
 
344
350
 
345
351
class KnitVersionedFile(VersionedFile):
360
366
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
361
367
                 factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
362
368
                 create=False, create_parent_dir=False, delay_create=False,
363
 
                 dir_mode=None, index=None):
 
369
                 dir_mode=None, index=None, access_method=None):
364
370
        """Construct a knit at location specified by relpath.
365
371
        
366
372
        :param create: If not True, only open an existing knit.
394
400
                dir_mode=dir_mode)
395
401
        else:
396
402
            self._index = index
397
 
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
398
 
            access_mode, create=create and not len(self), file_mode=file_mode,
399
 
            create_parent_dir=create_parent_dir, delay_create=delay_create,
400
 
            dir_mode=dir_mode)
 
403
        if access_method is None:
 
404
            _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
 
405
                ((create and not len(self)) and delay_create), create_parent_dir)
 
406
        else:
 
407
            _access = access_method
 
408
        if create and not len(self) and not delay_create:
 
409
            _access.create()
 
410
        self._data = _KnitData(_access)
401
411
 
402
412
    def __repr__(self):
403
413
        return '%s(%s)' % (self.__class__.__name__, 
420
430
        for count in xrange(self._max_delta_chain):
421
431
            parent = delta_parents[0]
422
432
            method = self._index.get_method(parent)
423
 
            pos, size = self._index.get_position(parent)
 
433
            index, pos, size = self._index.get_position(parent)
424
434
            if method == 'fulltext':
425
435
                fulltext_size = size
426
436
                break
484
494
        options.append('line-delta')
485
495
        store_lines = self.factory.lower_line_delta(delta)
486
496
 
487
 
        where, size = self._data.add_record(version_id, digest, store_lines)
488
 
        self._index.add_version(version_id, options, where, size, parents)
 
497
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
498
        self._index.add_version(version_id, options, access_memo, parents)
489
499
 
490
500
    def _add_raw_records(self, records, data):
491
501
        """Add all the records 'records' with data pre-joined in 'data'.
496
506
                     the preceding records sizes.
497
507
        """
498
508
        # write all the data
499
 
        pos = self._data.add_raw_record(data)
 
509
        raw_record_sizes = [record[3] for record in records]
 
510
        positions = self._data.add_raw_records(raw_record_sizes, data)
500
511
        offset = 0
501
512
        index_entries = []
502
 
        for (version_id, options, parents, size) in records:
503
 
            index_entries.append((version_id, options, pos+offset,
504
 
                                  size, parents))
 
513
        for (version_id, options, parents, size), access_memo in zip(
 
514
            records, positions):
 
515
            index_entries.append((version_id, options, access_memo, parents))
505
516
            if self._data._do_cache:
506
517
                self._data._cache[version_id] = data[offset:offset+size]
507
518
            offset += size
545
556
        current_values = self._index._cache[version_id]
546
557
        assert set(current_values[4]).difference(set(new_parents)) == set()
547
558
        self._index.add_version(version_id,
548
 
                                current_values[1], 
549
 
                                current_values[2],
550
 
                                current_values[3],
 
559
                                current_values[1],
 
560
                                (None, current_values[2], current_values[3]),
551
561
                                new_parents)
552
562
 
553
563
    def _extract_blocks(self, version_id, source, target):
568
578
            parent = parents[0]
569
579
        else:
570
580
            parent = None
571
 
        data_pos, data_size = self._index.get_position(version_id)
572
 
        data, sha1 = self._data.read_records(((version_id, data_pos, data_size),))[version_id]
 
581
        index_memo = self._index.get_position(version_id)
 
582
        data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
573
583
        noeol = 'no-eol' in self._index.get_options(version_id)
574
584
        if 'fulltext' == self._index.get_method(version_id):
575
585
            new_content = self.factory.parse_fulltext(data, version_id)
632
642
    __contains__ = has_version
633
643
 
634
644
    def _merge_annotations(self, content, parents, parent_texts={},
635
 
                           delta=None, annotated=None):
 
645
                           delta=None, annotated=None,
 
646
                           left_matching_blocks=None):
636
647
        """Merge annotations for content.  This is done by comparing
637
648
        the annotations based on changed to the text.
638
649
        """
 
650
        if left_matching_blocks is not None:
 
651
            delta_seq = diff._PrematchedMatcher(left_matching_blocks)
 
652
        else:
 
653
            delta_seq = None
639
654
        if annotated:
640
 
            delta_seq = None
641
655
            for parent_id in parents:
642
656
                merge_content = self._get_content(parent_id, parent_texts)
643
 
                seq = patiencediff.PatienceSequenceMatcher(
644
 
                                   None, merge_content.text(), content.text())
645
 
                if delta_seq is None:
646
 
                    # setup a delta seq to reuse.
647
 
                    delta_seq = seq
 
657
                if (parent_id == parents[0] and delta_seq is not None):
 
658
                    seq = delta_seq
 
659
                else:
 
660
                    seq = patiencediff.PatienceSequenceMatcher(
 
661
                        None, merge_content.text(), content.text())
648
662
                for i, j, n in seq.get_matching_blocks():
649
663
                    if n == 0:
650
664
                        continue
651
 
                    # this appears to copy (origin, text) pairs across to the new
652
 
                    # content for any line that matches the last-checked parent.
653
 
                    # FIXME: save the sequence control data for delta compression
654
 
                    # against the most relevant parent rather than rediffing.
 
665
                    # this appears to copy (origin, text) pairs across to the
 
666
                    # new content for any line that matches the last-checked
 
667
                    # parent.
655
668
                    content._lines[j:j+n] = merge_content._lines[i:i+n]
656
669
        if delta:
657
 
            if not annotated:
 
670
            if delta_seq is None:
658
671
                reference_content = self._get_content(parents[0], parent_texts)
659
672
                new_texts = content.text()
660
673
                old_texts = reference_content.text()
693
706
                    next = None
694
707
                else:
695
708
                    next = self.get_parents(cursor)[0]
696
 
                data_pos, data_size = self._index.get_position(cursor)
697
 
                component_data[cursor] = (method, data_pos, data_size, next)
 
709
                index_memo = self._index.get_position(cursor)
 
710
                component_data[cursor] = (method, index_memo, next)
698
711
                cursor = next
699
712
        return component_data
700
713
       
720
733
        self._check_add(version_id, lines)
721
734
        return self._add(version_id, lines[:], parents, self.delta, parent_texts)
722
735
 
723
 
    def _add_lines(self, version_id, parents, lines, parent_texts):
 
736
    def _add_lines(self, version_id, parents, lines, parent_texts,
 
737
                   left_matching_blocks=None):
724
738
        """See VersionedFile.add_lines."""
725
739
        self._check_add(version_id, lines)
726
740
        self._check_versions_present(parents)
727
 
        return self._add(version_id, lines[:], parents, self.delta, parent_texts)
 
741
        return self._add(version_id, lines[:], parents, self.delta,
 
742
                         parent_texts, left_matching_blocks)
728
743
 
729
744
    def _check_add(self, version_id, lines):
730
745
        """check that version_id and lines are safe to add."""
738
753
        self._check_lines_not_unicode(lines)
739
754
        self._check_lines_are_lines(lines)
740
755
 
741
 
    def _add(self, version_id, lines, parents, delta, parent_texts):
 
756
    def _add(self, version_id, lines, parents, delta, parent_texts,
 
757
             left_matching_blocks=None):
742
758
        """Add a set of lines on top of version specified by parents.
743
759
 
744
760
        If delta is true, compress the text as a line-delta against
788
804
        lines = self.factory.make(lines, version_id)
789
805
        if delta or (self.factory.annotated and len(present_parents) > 0):
790
806
            # Merge annotations from parent texts if so is needed.
791
 
            delta_hunks = self._merge_annotations(lines, present_parents, parent_texts,
792
 
                                                  delta, self.factory.annotated)
 
807
            delta_hunks = self._merge_annotations(lines, present_parents,
 
808
                parent_texts, delta, self.factory.annotated,
 
809
                left_matching_blocks)
793
810
 
794
811
        if delta:
795
812
            options.append('line-delta')
798
815
            options.append('fulltext')
799
816
            store_lines = self.factory.lower_fulltext(lines)
800
817
 
801
 
        where, size = self._data.add_record(version_id, digest, store_lines)
802
 
        self._index.add_version(version_id, options, where, size, parents)
 
818
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
819
        self._index.add_version(version_id, options, access_memo, parents)
803
820
        return lines
804
821
 
805
822
    def check(self, progress_bar=None):
827
844
        If the method is fulltext, next will be None.
828
845
        """
829
846
        position_map = self._get_components_positions(version_ids)
830
 
        # c = component_id, m = method, p = position, s = size, n = next
831
 
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
 
847
        # c = component_id, m = method, i_m = index_memo, n = next
 
848
        records = [(c, i_m) for c, (m, i_m, n) in position_map.iteritems()]
832
849
        record_map = {}
833
850
        for component_id, content, digest in \
834
851
                self._data.read_records_iter(records):
835
 
            method, position, size, next = position_map[component_id]
 
852
            method, index_memo, next = position_map[component_id]
836
853
            record_map[component_id] = method, content, digest, next
837
854
                          
838
855
        return record_map
930
947
        # get a in-component-order queue:
931
948
        for version_id in self.versions():
932
949
            if version_id in requested_versions:
933
 
                data_pos, length = self._index.get_position(version_id)
934
 
                version_id_records.append((version_id, data_pos, length))
 
950
                index_memo = self._index.get_position(version_id)
 
951
                version_id_records.append((version_id, index_memo))
935
952
 
936
953
        total = len(version_id_records)
937
954
        for version_idx, (version_id, data, sha_value) in \
1073
1090
            raise KnitHeaderError(badline=line,
1074
1091
                              filename=self._transport.abspath(self._filename))
1075
1092
 
1076
 
    def commit(self):
1077
 
        """Commit is a nop."""
1078
 
 
1079
1093
    def __repr__(self):
1080
1094
        return '%s(%s)' % (self.__class__.__name__, self._filename)
1081
1095
 
1268
1282
                result_list.append('.' + version)
1269
1283
        return ' '.join(result_list)
1270
1284
 
1271
 
    def add_version(self, version_id, options, pos, size, parents):
 
1285
    def add_version(self, version_id, options, index_memo, parents):
1272
1286
        """Add a version record to the index."""
1273
 
        self.add_versions(((version_id, options, pos, size, parents),))
 
1287
        self.add_versions(((version_id, options, index_memo, parents),))
1274
1288
 
1275
1289
    def add_versions(self, versions):
1276
1290
        """Add multiple versions to the index.
1283
1297
        orig_cache = self._cache.copy()
1284
1298
 
1285
1299
        try:
1286
 
            for version_id, options, pos, size, parents in versions:
 
1300
            for version_id, options, (index, pos, size), parents in versions:
1287
1301
                line = "\n%s %s %s %s %s :" % (version_id,
1288
1302
                                               ','.join(options),
1289
1303
                                               pos,
1316
1330
        return version_id in self._cache
1317
1331
 
1318
1332
    def get_position(self, version_id):
1319
 
        """Return data position and size of specified version."""
 
1333
        """Return details needed to access the version.
 
1334
        
 
1335
        .kndx indices do not support split-out data, so return None for the 
 
1336
        index field.
 
1337
 
 
1338
        :return: a tuple (None, data position, size) to hand to the access
 
1339
            logic to get the record.
 
1340
        """
1320
1341
        entry = self._cache[version_id]
1321
 
        return entry[2], entry[3]
 
1342
        return None, entry[2], entry[3]
1322
1343
 
1323
1344
    def get_method(self, version_id):
1324
1345
        """Return compression method of specified version."""
1386
1407
        if self._parents:
1387
1408
            for node in self._graph_index.iter_entries(keys):
1388
1409
                yield node
1389
 
                found_keys.add(node[0])
 
1410
                found_keys.add(node[1])
1390
1411
        else:
1391
1412
            # adapt parentless index to the rest of the code.
1392
1413
            for node in self._graph_index.iter_entries(keys):
1393
 
                yield node[0], node[1], ()
1394
 
                found_keys.add(node[0])
 
1414
                yield node[0], node[1], node[2], ()
 
1415
                found_keys.add(node[1])
1395
1416
        if check_present:
1396
1417
            missing_keys = keys.difference(found_keys)
1397
1418
            if missing_keys:
1399
1420
 
1400
1421
    def _present_keys(self, version_ids):
1401
1422
        return set([
1402
 
            node[0] for node in self._get_entries(version_ids)])
 
1423
            node[1] for node in self._get_entries(version_ids)])
1403
1424
 
1404
1425
    def _parentless_ancestry(self, versions):
1405
1426
        """Honour the get_ancestry API for parentless knit indices."""
1427
1448
            new_nodes = self._get_entries(this_iteration)
1428
1449
            found = set()
1429
1450
            pending = set()
1430
 
            for (key, value, node_refs) in new_nodes:
 
1451
            for (index, key, value, node_refs) in new_nodes:
1431
1452
                # dont ask for ghosties - otherwise
1432
1453
                # we we can end up looping with pending
1433
1454
                # being entirely ghosted.
1464
1485
            this_iteration = pending
1465
1486
            new_nodes = self._get_entries(this_iteration)
1466
1487
            pending = set()
1467
 
            for (key, value, node_refs) in new_nodes:
 
1488
            for (index, key, value, node_refs) in new_nodes:
1468
1489
                graph[key] = node_refs[0]
1469
1490
                # queue parents 
1470
1491
                for parent in graph[key]:
1488
1509
        if not self._parents:
1489
1510
            return [(key, ()) for key in self.get_versions()]
1490
1511
        result = []
1491
 
        for key, value, refs in self._graph_index.iter_all_entries():
 
1512
        for index, key, value, refs in self._graph_index.iter_all_entries():
1492
1513
            result.append((key[0], tuple([ref[0] for ref in refs[0]])))
1493
1514
        return result
1494
1515
 
1506
1527
            all_parents = set()
1507
1528
            present_parents = set()
1508
1529
            for node in all_nodes:
1509
 
                all_parents.update(node[2][0])
 
1530
                all_parents.update(node[3][0])
1510
1531
                # any node we are querying must be present
1511
 
                present_parents.add(node[0])
 
1532
                present_parents.add(node[1])
1512
1533
            unknown_parents = all_parents.difference(present_parents)
1513
1534
            present_parents.update(self._present_keys(unknown_parents))
1514
1535
            for node in all_nodes:
1515
1536
                parents = []
1516
 
                for parent in node[2][0]:
 
1537
                for parent in node[3][0]:
1517
1538
                    if parent in present_parents:
1518
1539
                        parents.append(parent[0])
1519
 
                yield node[0][0], tuple(parents)
 
1540
                yield node[1][0], tuple(parents)
1520
1541
        else:
1521
1542
            for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1522
 
                yield node[0][0], ()
 
1543
                yield node[1][0], ()
1523
1544
 
1524
1545
    def num_versions(self):
1525
1546
        return len(list(self._graph_index.iter_all_entries()))
1528
1549
 
1529
1550
    def get_versions(self):
1530
1551
        """Get all the versions in the file. not topologically sorted."""
1531
 
        return [node[0][0] for node in self._graph_index.iter_all_entries()]
 
1552
        return [node[1][0] for node in self._graph_index.iter_all_entries()]
1532
1553
    
1533
1554
    def has_version(self, version_id):
1534
1555
        """True if the version is in the index."""
1538
1559
        return tuple(key[0] for key in keys)
1539
1560
 
1540
1561
    def get_position(self, version_id):
1541
 
        """Return data position and size of specified version."""
1542
 
        bits = self._get_node(version_id)[1][1:].split(' ')
1543
 
        return int(bits[0]), int(bits[1])
 
1562
        """Return details needed to access the version.
 
1563
        
 
1564
        :return: a tuple (index, data position, size) to hand to the access
 
1565
            logic to get the record.
 
1566
        """
 
1567
        node = self._get_node(version_id)
 
1568
        bits = node[2][1:].split(' ')
 
1569
        return node[0], int(bits[0]), int(bits[1])
1544
1570
 
1545
1571
    def get_method(self, version_id):
1546
1572
        """Return compression method of specified version."""
1547
1573
        if not self._deltas:
1548
1574
            return 'fulltext'
1549
 
        return self._parent_compression(self._get_node(version_id)[2][1])
 
1575
        return self._parent_compression(self._get_node(version_id)[3][1])
1550
1576
 
1551
1577
    def _parent_compression(self, reference_list):
1552
1578
        # use the second reference list to decide if this is delta'd or not.
1567
1593
        if not self._deltas:
1568
1594
            options = ['fulltext']
1569
1595
        else:
1570
 
            options = [self._parent_compression(node[2][1])]
1571
 
        if node[1][0] == 'N':
 
1596
            options = [self._parent_compression(node[3][1])]
 
1597
        if node[2][0] == 'N':
1572
1598
            options.append('no-eol')
1573
1599
        return options
1574
1600
 
1586
1612
            check_present=True))
1587
1613
        if not self._parents:
1588
1614
            return ()
1589
 
        return self._keys_to_version_ids(nodes[0][2][0])
 
1615
        return self._keys_to_version_ids(nodes[0][3][0])
1590
1616
 
1591
1617
    def check_versions_present(self, version_ids):
1592
1618
        """Check that all specified versions are present."""
1596
1622
        if missing:
1597
1623
            raise RevisionNotPresent(missing.pop(), self)
1598
1624
 
1599
 
    def add_version(self, version_id, options, pos, size, parents):
 
1625
    def add_version(self, version_id, options, access_memo, parents):
1600
1626
        """Add a version record to the index."""
1601
 
        return self.add_versions(((version_id, options, pos, size, parents),))
 
1627
        return self.add_versions(((version_id, options, access_memo, parents),))
1602
1628
 
1603
1629
    def add_versions(self, versions):
1604
1630
        """Add multiple versions to the index.
1618
1644
        # check for dups
1619
1645
 
1620
1646
        keys = {}
1621
 
        for (version_id, options, pos, size, parents) in versions:
1622
 
            # index keys are tuples:
 
1647
        for (version_id, options, access_memo, parents) in versions:
 
1648
            index, pos, size = access_memo
1623
1649
            key = (version_id, )
1624
1650
            parents = tuple((parent, ) for parent in parents)
1625
1651
            if 'no-eol' in options:
1645
1671
                node_refs = ()
1646
1672
            keys[key] = (value, node_refs)
1647
1673
        present_nodes = self._get_entries(keys)
1648
 
        for (key, value, node_refs) in present_nodes:
 
1674
        for (index, key, value, node_refs) in present_nodes:
1649
1675
            if (value, node_refs) != keys[key]:
1650
1676
                raise KnitCorrupt(self, "inconsistent details in add_versions"
1651
1677
                    ": %s %s" % ((value, node_refs), keys[key]))
1661
1687
        
1662
1688
    def _version_ids_to_keys(self, version_ids):
1663
1689
        return set((version_id, ) for version_id in version_ids)
1664
 
        
1665
 
 
1666
 
class _KnitData(_KnitComponentFile):
1667
 
    """Contents of the knit data file"""
1668
 
 
1669
 
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
1670
 
                 create_parent_dir=False, delay_create=False,
1671
 
                 dir_mode=None):
1672
 
        _KnitComponentFile.__init__(self, transport, filename, mode,
1673
 
                                    file_mode=file_mode,
1674
 
                                    create_parent_dir=create_parent_dir,
1675
 
                                    dir_mode=dir_mode)
 
1690
 
 
1691
 
 
1692
class _KnitAccess(object):
 
1693
    """Access to knit records in a .knit file."""
 
1694
 
 
1695
    def __init__(self, transport, filename, _file_mode, _dir_mode,
 
1696
        _need_to_create, _create_parent_dir):
 
1697
        """Create a _KnitAccess for accessing and inserting data.
 
1698
 
 
1699
        :param transport: The transport the .knit is located on.
 
1700
        :param filename: The filename of the .knit.
 
1701
        """
 
1702
        self._transport = transport
 
1703
        self._filename = filename
 
1704
        self._file_mode = _file_mode
 
1705
        self._dir_mode = _dir_mode
 
1706
        self._need_to_create = _need_to_create
 
1707
        self._create_parent_dir = _create_parent_dir
 
1708
 
 
1709
    def add_raw_records(self, sizes, raw_data):
 
1710
        """Add raw knit bytes to a storage area.
 
1711
 
 
1712
        The data is spooled to whereever the access method is storing data.
 
1713
 
 
1714
        :param sizes: An iterable containing the size of each raw data segment.
 
1715
        :param raw_data: A bytestring containing the data.
 
1716
        :return: A list of memos to retrieve the record later. Each memo is a
 
1717
            tuple - (index, pos, length), where the index field is always None
 
1718
            for the .knit access method.
 
1719
        """
 
1720
        assert type(raw_data) == str, \
 
1721
            'data must be plain bytes was %s' % type(raw_data)
 
1722
        if not self._need_to_create:
 
1723
            base = self._transport.append_bytes(self._filename, raw_data)
 
1724
        else:
 
1725
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
 
1726
                                   create_parent_dir=self._create_parent_dir,
 
1727
                                   mode=self._file_mode,
 
1728
                                   dir_mode=self._dir_mode)
 
1729
            self._need_to_create = False
 
1730
            base = 0
 
1731
        result = []
 
1732
        for size in sizes:
 
1733
            result.append((None, base, size))
 
1734
            base += size
 
1735
        return result
 
1736
 
 
1737
    def create(self):
 
1738
        """IFF this data access has its own storage area, initialise it.
 
1739
 
 
1740
        :return: None.
 
1741
        """
 
1742
        self._transport.put_bytes_non_atomic(self._filename, '',
 
1743
                                             mode=self._file_mode)
 
1744
 
 
1745
    def open_file(self):
 
1746
        """IFF this data access can be represented as a single file, open it.
 
1747
 
 
1748
        For knits that are not mapped to a single file on disk this will
 
1749
        always return None.
 
1750
 
 
1751
        :return: None or a file handle.
 
1752
        """
 
1753
        try:
 
1754
            return self._transport.get(self._filename)
 
1755
        except NoSuchFile:
 
1756
            pass
 
1757
        return None
 
1758
 
 
1759
    def get_raw_records(self, memos_for_retrieval):
 
1760
        """Get the raw bytes for a records.
 
1761
 
 
1762
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1763
            length) memo for retrieving the bytes. The .knit method ignores
 
1764
            the index as there is always only a single file.
 
1765
        :return: An iterator over the bytes of the records.
 
1766
        """
 
1767
        read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
 
1768
        for pos, data in self._transport.readv(self._filename, read_vector):
 
1769
            yield data
 
1770
 
 
1771
 
 
1772
class _PackAccess(object):
 
1773
    """Access to knit records via a collection of packs."""
 
1774
 
 
1775
    def __init__(self, index_to_packs, writer=None):
 
1776
        """Create a _PackAccess object.
 
1777
 
 
1778
        :param index_to_packs: A dict mapping index objects to the transport
 
1779
            and file names for obtaining data.
 
1780
        :param writer: A tuple (pack.ContainerWriter, write_index) which
 
1781
            contains the pack to write, and the index that reads from it will
 
1782
            be associated with.
 
1783
        """
 
1784
        if writer:
 
1785
            self.container_writer = writer[0]
 
1786
            self.write_index = writer[1]
 
1787
        else:
 
1788
            self.container_writer = None
 
1789
            self.write_index = None
 
1790
        self.indices = index_to_packs
 
1791
 
 
1792
    def add_raw_records(self, sizes, raw_data):
 
1793
        """Add raw knit bytes to a storage area.
 
1794
 
 
1795
        The data is spooled to the container writer in one bytes-record per
 
1796
        raw data item.
 
1797
 
 
1798
        :param sizes: An iterable containing the size of each raw data segment.
 
1799
        :param raw_data: A bytestring containing the data.
 
1800
        :return: A list of memos to retrieve the record later. Each memo is a
 
1801
            tuple - (index, pos, length), where the index field is the 
 
1802
            write_index object supplied to the PackAccess object.
 
1803
        """
 
1804
        assert type(raw_data) == str, \
 
1805
            'data must be plain bytes was %s' % type(raw_data)
 
1806
        result = []
 
1807
        offset = 0
 
1808
        for size in sizes:
 
1809
            p_offset, p_length = self.container_writer.add_bytes_record(
 
1810
                raw_data[offset:offset+size], [])
 
1811
            offset += size
 
1812
            result.append((self.write_index, p_offset, p_length))
 
1813
        return result
 
1814
 
 
1815
    def create(self):
 
1816
        """Pack based knits do not get individually created."""
 
1817
 
 
1818
    def get_raw_records(self, memos_for_retrieval):
 
1819
        """Get the raw bytes for a records.
 
1820
 
 
1821
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1822
            length) memo for retrieving the bytes. The Pack access method
 
1823
            looks up the pack to use for a given record in its index_to_pack
 
1824
            map.
 
1825
        :return: An iterator over the bytes of the records.
 
1826
        """
 
1827
        # first pass, group into same-index requests
 
1828
        request_lists = []
 
1829
        current_index = None
 
1830
        for (index, offset, length) in memos_for_retrieval:
 
1831
            if current_index == index:
 
1832
                current_list.append((offset, length))
 
1833
            else:
 
1834
                if current_index is not None:
 
1835
                    request_lists.append((current_index, current_list))
 
1836
                current_index = index
 
1837
                current_list = [(offset, length)]
 
1838
        # handle the last entry
 
1839
        if current_index is not None:
 
1840
            request_lists.append((current_index, current_list))
 
1841
        for index, offsets in request_lists:
 
1842
            transport, path = self.indices[index]
 
1843
            reader = pack.make_readv_reader(transport, path, offsets)
 
1844
            for names, read_func in reader.iter_records():
 
1845
                yield read_func(None)
 
1846
 
 
1847
    def open_file(self):
 
1848
        """Pack based knits have no single file."""
 
1849
        return None
 
1850
 
 
1851
    def set_writer(self, writer, index, (transport, packname)):
 
1852
        """Set a writer to use for adding data."""
 
1853
        self.indices[index] = (transport, packname)
 
1854
        self.container_writer = writer
 
1855
        self.write_index = index
 
1856
 
 
1857
 
 
1858
class _KnitData(object):
 
1859
    """Manage extraction of data from a KnitAccess, caching and decompressing.
 
1860
    
 
1861
    The KnitData class provides the logic for parsing and using knit records,
 
1862
    making use of an access method for the low level read and write operations.
 
1863
    """
 
1864
 
 
1865
    def __init__(self, access):
 
1866
        """Create a KnitData object.
 
1867
 
 
1868
        :param access: The access method to use. Access methods such as
 
1869
            _KnitAccess manage the insertion of raw records and the subsequent
 
1870
            retrieval of the same.
 
1871
        """
 
1872
        self._access = access
1676
1873
        self._checked = False
1677
1874
        # TODO: jam 20060713 conceptually, this could spill to disk
1678
1875
        #       if the cached size gets larger than a certain amount
1680
1877
        #       a simple dictionary
1681
1878
        self._cache = {}
1682
1879
        self._do_cache = False
1683
 
        if create:
1684
 
            if delay_create:
1685
 
                self._need_to_create = create
1686
 
            else:
1687
 
                self._transport.put_bytes_non_atomic(self._filename, '',
1688
 
                                                     mode=self._file_mode)
1689
1880
 
1690
1881
    def enable_cache(self):
1691
1882
        """Enable caching of reads."""
1697
1888
        self._cache = {}
1698
1889
 
1699
1890
    def _open_file(self):
1700
 
        try:
1701
 
            return self._transport.get(self._filename)
1702
 
        except NoSuchFile:
1703
 
            pass
1704
 
        return None
 
1891
        return self._access.open_file()
1705
1892
 
1706
1893
    def _record_to_data(self, version_id, digest, lines):
1707
1894
        """Convert version_id, digest, lines into a raw data block.
1724
1911
        sio.seek(0)
1725
1912
        return length, sio
1726
1913
 
1727
 
    def add_raw_record(self, raw_data):
 
1914
    def add_raw_records(self, sizes, raw_data):
1728
1915
        """Append a prepared record to the data file.
1729
1916
        
1730
 
        :return: the offset in the data file raw_data was written.
 
1917
        :param sizes: An iterable containing the size of each raw data segment.
 
1918
        :param raw_data: A bytestring containing the data.
 
1919
        :return: a list of index data for the way the data was stored.
 
1920
            See the access method add_raw_records documentation for more
 
1921
            details.
1731
1922
        """
1732
 
        assert isinstance(raw_data, str), 'data must be plain bytes'
1733
 
        if not self._need_to_create:
1734
 
            return self._transport.append_bytes(self._filename, raw_data)
1735
 
        else:
1736
 
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
1737
 
                                   create_parent_dir=self._create_parent_dir,
1738
 
                                   mode=self._file_mode,
1739
 
                                   dir_mode=self._dir_mode)
1740
 
            self._need_to_create = False
1741
 
            return 0
 
1923
        return self._access.add_raw_records(sizes, raw_data)
1742
1924
        
1743
1925
    def add_record(self, version_id, digest, lines):
1744
 
        """Write new text record to disk.  Returns the position in the
1745
 
        file where it was written."""
 
1926
        """Write new text record to disk. 
 
1927
        
 
1928
        Returns index data for retrieving it later, as per add_raw_records.
 
1929
        """
1746
1930
        size, sio = self._record_to_data(version_id, digest, lines)
1747
 
        # write to disk
1748
 
        if not self._need_to_create:
1749
 
            start_pos = self._transport.append_file(self._filename, sio)
1750
 
        else:
1751
 
            self._transport.put_file_non_atomic(self._filename, sio,
1752
 
                               create_parent_dir=self._create_parent_dir,
1753
 
                               mode=self._file_mode,
1754
 
                               dir_mode=self._dir_mode)
1755
 
            self._need_to_create = False
1756
 
            start_pos = 0
 
1931
        result = self.add_raw_records([size], sio.getvalue())
1757
1932
        if self._do_cache:
1758
1933
            self._cache[version_id] = sio.getvalue()
1759
 
        return start_pos, size
 
1934
        return result[0]
1760
1935
 
1761
1936
    def _parse_record_header(self, version_id, raw_data):
1762
1937
        """Parse a record header for consistency.
1768
1943
        try:
1769
1944
            rec = self._check_header(version_id, df.readline())
1770
1945
        except Exception, e:
1771
 
            raise KnitCorrupt(self._filename,
 
1946
            raise KnitCorrupt(self._access,
1772
1947
                              "While reading {%s} got %s(%s)"
1773
1948
                              % (version_id, e.__class__.__name__, str(e)))
1774
1949
        return df, rec
1776
1951
    def _check_header(self, version_id, line):
1777
1952
        rec = line.split()
1778
1953
        if len(rec) != 4:
1779
 
            raise KnitCorrupt(self._filename,
 
1954
            raise KnitCorrupt(self._access,
1780
1955
                              'unexpected number of elements in record header')
1781
1956
        if rec[1] != version_id:
1782
 
            raise KnitCorrupt(self._filename,
 
1957
            raise KnitCorrupt(self._access,
1783
1958
                              'unexpected version, wanted %r, got %r'
1784
1959
                              % (version_id, rec[1]))
1785
1960
        return rec
1794
1969
        try:
1795
1970
            record_contents = df.readlines()
1796
1971
        except Exception, e:
1797
 
            raise KnitCorrupt(self._filename,
 
1972
            raise KnitCorrupt(self._access,
1798
1973
                              "While reading {%s} got %s(%s)"
1799
1974
                              % (version_id, e.__class__.__name__, str(e)))
1800
1975
        header = record_contents.pop(0)
1802
1977
 
1803
1978
        last_line = record_contents.pop()
1804
1979
        if len(record_contents) != int(rec[2]):
1805
 
            raise KnitCorrupt(self._filename,
 
1980
            raise KnitCorrupt(self._access,
1806
1981
                              'incorrect number of lines %s != %s'
1807
1982
                              ' for version {%s}'
1808
1983
                              % (len(record_contents), int(rec[2]),
1809
1984
                                 version_id))
1810
1985
        if last_line != 'end %s\n' % rec[1]:
1811
 
            raise KnitCorrupt(self._filename,
 
1986
            raise KnitCorrupt(self._access,
1812
1987
                              'unexpected version end line %r, wanted %r' 
1813
1988
                              % (last_line, version_id))
1814
1989
        df.close()
1826
2001
            # grab the disk data needed.
1827
2002
            if self._cache:
1828
2003
                # Don't check _cache if it is empty
1829
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
2004
                needed_offsets = [index_memo for version_id, index_memo
1830
2005
                                              in records
1831
2006
                                              if version_id not in self._cache]
1832
2007
            else:
1833
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
2008
                needed_offsets = [index_memo for version_id, index_memo
1834
2009
                                               in records]
1835
2010
 
1836
 
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
2011
            raw_records = self._access.get_raw_records(needed_offsets)
1837
2012
 
1838
 
        for version_id, pos, size in records:
 
2013
        for version_id, index_memo in records:
1839
2014
            if version_id in self._cache:
1840
2015
                # This data has already been validated
1841
2016
                data = self._cache[version_id]
1842
2017
            else:
1843
 
                pos, data = raw_records.next()
 
2018
                data = raw_records.next()
1844
2019
                if self._do_cache:
1845
2020
                    self._cache[version_id] = data
1846
2021
 
1885
2060
 
1886
2061
        # The transport optimizes the fetching as well 
1887
2062
        # (ie, reads continuous ranges.)
1888
 
        readv_response = self._transport.readv(self._filename,
1889
 
            [(pos, size) for version_id, pos, size in needed_records])
 
2063
        raw_data = self._access.get_raw_records(
 
2064
            [index_memo for version_id, index_memo in needed_records])
1890
2065
 
1891
 
        for (version_id, pos, size), (pos, data) in \
1892
 
                izip(iter(needed_records), readv_response):
 
2066
        for (version_id, index_memo), data in \
 
2067
                izip(iter(needed_records), raw_data):
1893
2068
            content, digest = self._parse_record(version_id, data)
1894
2069
            if self._do_cache:
1895
2070
                self._cache[version_id] = data
1984
2159
                    assert (self.target.has_version(parent) or
1985
2160
                            parent in copy_set or
1986
2161
                            not self.source.has_version(parent))
1987
 
                data_pos, data_size = self.source._index.get_position(version_id)
1988
 
                copy_queue_records.append((version_id, data_pos, data_size))
 
2162
                index_memo = self.source._index.get_position(version_id)
 
2163
                copy_queue_records.append((version_id, index_memo))
1989
2164
                copy_queue.append((version_id, options, parents))
1990
2165
                copy_set.add(version_id)
1991
2166