/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Martin Pool
  • Date: 2008-05-27 03:00:53 UTC
  • mfrom: (3452 +trunk)
  • mto: (3724.1.1 lock-hooks)
  • mto: This revision was merged to the branch mainline in revision 3730.
  • Revision ID: mbp@sourcefrog.net-20080527030053-0mct6dypek0ysjc3
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
107
107
    contains_linebreaks,
108
108
    sha_string,
109
109
    sha_strings,
110
 
    )
111
 
from bzrlib.symbol_versioning import (
112
 
    DEPRECATED_PARAMETER,
113
 
    deprecated_method,
114
 
    deprecated_passed,
115
 
    one_four,
 
110
    split_lines,
116
111
    )
117
112
from bzrlib.tsort import topo_sort
118
113
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
119
114
import bzrlib.ui
120
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
 
115
from bzrlib.versionedfile import (
 
116
    AbsentContentFactory,
 
117
    adapter_registry,
 
118
    ContentFactory,
 
119
    InterVersionedFile,
 
120
    VersionedFile,
 
121
    )
121
122
import bzrlib.weave
122
123
 
123
124
 
138
139
INDEX_SUFFIX = '.kndx'
139
140
 
140
141
 
 
142
class KnitAdapter(object):
 
143
    """Base class for knit record adaption."""
 
144
 
 
145
    def __init__(self, basis_vf):
 
146
        """Create an adapter which accesses full texts from basis_vf.
 
147
        
 
148
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
149
            May be None for adapters that do not need to access basis texts.
 
150
        """
 
151
        self._data = _KnitData(None)
 
152
        self._annotate_factory = KnitAnnotateFactory()
 
153
        self._plain_factory = KnitPlainFactory()
 
154
        self._basis_vf = basis_vf
 
155
 
 
156
 
 
157
class FTAnnotatedToUnannotated(KnitAdapter):
 
158
    """An adapter from FT annotated knits to unannotated ones."""
 
159
 
 
160
    def get_bytes(self, factory, annotated_compressed_bytes):
 
161
        rec, contents = \
 
162
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
163
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
164
        size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
 
165
        return bytes
 
166
 
 
167
 
 
168
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
169
    """An adapter for deltas from annotated to unannotated."""
 
170
 
 
171
    def get_bytes(self, factory, annotated_compressed_bytes):
 
172
        rec, contents = \
 
173
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
174
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
175
            plain=True)
 
176
        contents = self._plain_factory.lower_line_delta(delta)
 
177
        size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
 
178
        return bytes
 
179
 
 
180
 
 
181
class FTAnnotatedToFullText(KnitAdapter):
 
182
    """An adapter from FT annotated knits to unannotated ones."""
 
183
 
 
184
    def get_bytes(self, factory, annotated_compressed_bytes):
 
185
        rec, contents = \
 
186
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
187
        content, delta = self._annotate_factory.parse_record(factory.key[0],
 
188
            contents, factory._build_details, None)
 
189
        return ''.join(content.text())
 
190
 
 
191
 
 
192
class DeltaAnnotatedToFullText(KnitAdapter):
 
193
    """An adapter for deltas from annotated to unannotated."""
 
194
 
 
195
    def get_bytes(self, factory, annotated_compressed_bytes):
 
196
        rec, contents = \
 
197
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
198
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
199
            plain=True)
 
200
        compression_parent = factory.parents[0][0]
 
201
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
202
        # Manually apply the delta because we have one annotated content and
 
203
        # one plain.
 
204
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
205
        basis_content.apply_delta(delta, rec[1])
 
206
        basis_content._should_strip_eol = factory._build_details[1]
 
207
        return ''.join(basis_content.text())
 
208
 
 
209
 
 
210
class FTPlainToFullText(KnitAdapter):
 
211
    """An adapter from FT plain knits to unannotated ones."""
 
212
 
 
213
    def get_bytes(self, factory, compressed_bytes):
 
214
        rec, contents = \
 
215
            self._data._parse_record_unchecked(compressed_bytes)
 
216
        content, delta = self._plain_factory.parse_record(factory.key[0],
 
217
            contents, factory._build_details, None)
 
218
        return ''.join(content.text())
 
219
 
 
220
 
 
221
class DeltaPlainToFullText(KnitAdapter):
 
222
    """An adapter for deltas from annotated to unannotated."""
 
223
 
 
224
    def get_bytes(self, factory, compressed_bytes):
 
225
        rec, contents = \
 
226
            self._data._parse_record_unchecked(compressed_bytes)
 
227
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
228
        compression_parent = factory.parents[0][0]
 
229
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
230
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
231
        # Manually apply the delta because we have one annotated content and
 
232
        # one plain.
 
233
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
234
            factory._build_details, basis_content)
 
235
        return ''.join(content.text())
 
236
 
 
237
 
 
238
class KnitContentFactory(ContentFactory):
 
239
    """Content factory for streaming from knits.
 
240
    
 
241
    :seealso ContentFactory:
 
242
    """
 
243
 
 
244
    def __init__(self, version, parents, build_details, sha1, raw_record,
 
245
        annotated, knit=None):
 
246
        """Create a KnitContentFactory for version.
 
247
        
 
248
        :param version: The version.
 
249
        :param parents: The parents.
 
250
        :param build_details: The build details as returned from
 
251
            get_build_details.
 
252
        :param sha1: The sha1 expected from the full text of this object.
 
253
        :param raw_record: The bytes of the knit data from disk.
 
254
        :param annotated: True if the raw data is annotated.
 
255
        """
 
256
        ContentFactory.__init__(self)
 
257
        self.sha1 = sha1
 
258
        self.key = (version,)
 
259
        self.parents = tuple((parent,) for parent in parents)
 
260
        if build_details[0] == 'line-delta':
 
261
            kind = 'delta'
 
262
        else:
 
263
            kind = 'ft'
 
264
        if annotated:
 
265
            annotated_kind = 'annotated-'
 
266
        else:
 
267
            annotated_kind = ''
 
268
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
269
        self._raw_record = raw_record
 
270
        self._build_details = build_details
 
271
        self._knit = knit
 
272
 
 
273
    def get_bytes_as(self, storage_kind):
 
274
        if storage_kind == self.storage_kind:
 
275
            return self._raw_record
 
276
        if storage_kind == 'fulltext' and self._knit is not None:
 
277
            return self._knit.get_text(self.key[0])
 
278
        else:
 
279
            raise errors.UnavailableRepresentation(self.key, storage_kind,
 
280
                self.storage_kind)
 
281
 
 
282
 
141
283
class KnitContent(object):
142
284
    """Content of a knit version to which deltas can be applied."""
143
285
 
144
286
    def __init__(self):
145
287
        self._should_strip_eol = False
146
288
 
147
 
    def annotate(self):
148
 
        """Return a list of (origin, text) tuples."""
149
 
        return list(self.annotate_iter())
150
 
 
151
289
    def apply_delta(self, delta, new_version_id):
152
290
        """Apply delta to this object to become new_version_id."""
153
291
        raise NotImplementedError(self.apply_delta)
206
344
        KnitContent.__init__(self)
207
345
        self._lines = lines
208
346
 
209
 
    def annotate_iter(self):
210
 
        """Yield tuples of (origin, text) for each content line."""
211
 
        return iter(self._lines)
 
347
    def annotate(self):
 
348
        """Return a list of (origin, text) for each content line."""
 
349
        return list(self._lines)
212
350
 
213
351
    def apply_delta(self, delta, new_version_id):
214
352
        """Apply delta to this object to become new_version_id."""
235
373
                % (e,))
236
374
 
237
375
        if self._should_strip_eol:
238
 
            anno, line = lines[-1]
239
 
            lines[-1] = (anno, line.rstrip('\n'))
 
376
            lines[-1] = lines[-1].rstrip('\n')
240
377
        return lines
241
378
 
242
379
    def copy(self):
256
393
        self._lines = lines
257
394
        self._version_id = version_id
258
395
 
259
 
    def annotate_iter(self):
260
 
        """Yield tuples of (origin, text) for each content line."""
261
 
        for line in self._lines:
262
 
            yield self._version_id, line
 
396
    def annotate(self):
 
397
        """Return a list of (origin, text) for each content line."""
 
398
        return [(self._version_id, line) for line in self._lines]
263
399
 
264
400
    def apply_delta(self, delta, new_version_id):
265
401
        """Apply delta to this object to become new_version_id."""
306
442
        """
307
443
        method, noeol = record_details
308
444
        if method == 'line-delta':
309
 
            assert base_content is not None
310
445
            if copy_base_content:
311
446
                content = base_content.copy()
312
447
            else:
427
562
                       for origin, text in lines)
428
563
        return out
429
564
 
430
 
    def annotate_iter(self, knit, version_id):
 
565
    def annotate(self, knit, version_id):
431
566
        content = knit._get_content(version_id)
432
 
        return content.annotate_iter()
 
567
        return content.annotate()
433
568
 
434
569
 
435
570
class KnitPlainFactory(_KnitFactory):
489
624
            out.extend(lines)
490
625
        return out
491
626
 
492
 
    def annotate_iter(self, knit, version_id):
 
627
    def annotate(self, knit, version_id):
493
628
        annotator = _KnitAnnotator(knit)
494
 
        return iter(annotator.annotate(version_id))
 
629
        return annotator.annotate(version_id)
495
630
 
496
631
 
497
632
def make_empty_knit(transport, relpath):
498
633
    """Construct a empty knit at the specified location."""
499
 
    k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
 
634
    k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
 
635
 
 
636
 
 
637
def make_file_knit(name, transport, file_mode=None, access_mode='w',
 
638
    factory=None, delta=True, create=False, create_parent_dir=False,
 
639
    delay_create=False, dir_mode=None, get_scope=None):
 
640
    """Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
 
641
    if factory is None:
 
642
        factory = KnitAnnotateFactory()
 
643
    if get_scope is None:
 
644
        get_scope = lambda:None
 
645
    index = _KnitIndex(transport, name + INDEX_SUFFIX,
 
646
        access_mode, create=create, file_mode=file_mode,
 
647
        create_parent_dir=create_parent_dir, delay_create=delay_create,
 
648
        dir_mode=dir_mode, get_scope=get_scope)
 
649
    access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
 
650
        dir_mode, ((create and not len(index)) and delay_create),
 
651
        create_parent_dir)
 
652
    return KnitVersionedFile(name, transport, factory=factory,
 
653
        create=create, delay_create=delay_create, index=index,
 
654
        access_method=access)
 
655
 
 
656
 
 
657
def get_suffixes():
 
658
    """Return the suffixes used by file based knits."""
 
659
    return [DATA_SUFFIX, INDEX_SUFFIX]
 
660
make_file_knit.get_suffixes = get_suffixes
500
661
 
501
662
 
502
663
class KnitVersionedFile(VersionedFile):
514
675
    stored and retrieved.
515
676
    """
516
677
 
517
 
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
 
678
    def __init__(self, relpath, transport, file_mode=None,
518
679
        factory=None, delta=True, create=False, create_parent_dir=False,
519
680
        delay_create=False, dir_mode=None, index=None, access_method=None):
520
681
        """Construct a knit at location specified by relpath.
527
688
            actually be created until the first data is stored.
528
689
        :param index: An index to use for the knit.
529
690
        """
530
 
        if access_mode is None:
531
 
            access_mode = 'w'
532
 
        super(KnitVersionedFile, self).__init__(access_mode)
533
 
        assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
 
691
        super(KnitVersionedFile, self).__init__()
534
692
        self.transport = transport
535
693
        self.filename = relpath
536
694
        self.factory = factory or KnitAnnotateFactory()
537
 
        self.writable = (access_mode == 'w')
538
695
        self.delta = delta
539
696
 
540
697
        self._max_delta_chain = 200
541
698
 
542
 
        if index is None:
543
 
            self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
544
 
                access_mode, create=create, file_mode=file_mode,
545
 
                create_parent_dir=create_parent_dir, delay_create=delay_create,
546
 
                dir_mode=dir_mode)
547
 
        else:
548
 
            self._index = index
549
 
        if access_method is None:
550
 
            _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
551
 
                ((create and not len(self)) and delay_create), create_parent_dir)
552
 
        else:
553
 
            _access = access_method
 
699
        if None in (access_method, index):
 
700
            raise ValueError("No default access_method or index any more")
 
701
        self._index = index
 
702
        _access = access_method
554
703
        if create and not len(self) and not delay_create:
555
704
            _access.create()
556
705
        self._data = _KnitData(_access)
588
737
 
589
738
        return fulltext_size > delta_size
590
739
 
 
740
    def _check_write_ok(self):
 
741
        return self._index._check_write_ok()
 
742
 
591
743
    def _add_raw_records(self, records, data):
592
744
        """Add all the records 'records' with data pre-joined in 'data'.
593
745
 
599
751
        # write all the data
600
752
        raw_record_sizes = [record[3] for record in records]
601
753
        positions = self._data.add_raw_records(raw_record_sizes, data)
602
 
        offset = 0
603
754
        index_entries = []
604
 
        for (version_id, options, parents, size), access_memo in zip(
 
755
        for (version_id, options, parents, _), access_memo in zip(
605
756
            records, positions):
606
757
            index_entries.append((version_id, options, access_memo, parents))
607
 
            if self._data._do_cache:
608
 
                self._data._cache[version_id] = data[offset:offset+size]
609
 
            offset += size
610
758
        self._index.add_versions(index_entries)
611
759
 
612
 
    def enable_cache(self):
613
 
        """Start caching data for this knit"""
614
 
        self._data.enable_cache()
615
 
 
616
 
    def clear_cache(self):
617
 
        """Clear the data cache only."""
618
 
        self._data.clear_cache()
619
 
 
620
760
    def copy_to(self, name, transport):
621
761
        """See VersionedFile.copy_to()."""
622
762
        # copy the current index to a temp index to avoid racing with local
632
772
        # move the copied index into place
633
773
        transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
634
774
 
635
 
    def create_empty(self, name, transport, mode=None):
636
 
        return KnitVersionedFile(name, transport, factory=self.factory,
637
 
                                 delta=self.delta, create=True)
638
 
    
639
775
    def get_data_stream(self, required_versions):
640
776
        """Get a data stream for the specified versions.
641
777
 
693
829
                # put them in anywhere, but we hope that sending them soon
694
830
                # after the fulltext will give good locality in the receiver
695
831
                ready_to_send[:0] = deferred.pop(version_id)
696
 
        assert len(deferred) == 0, \
697
 
            "Still have compressed child versions waiting to be sent"
 
832
        if not (len(deferred) == 0):
 
833
            raise AssertionError("Still have compressed child versions waiting to be sent")
698
834
        # XXX: The stream format is such that we cannot stream it - we have to
699
835
        # know the length of all the data a-priori.
700
836
        raw_datum = []
701
837
        result_version_list = []
702
 
        for (version_id, raw_data), \
 
838
        for (version_id, raw_data, _), \
703
839
            (version_id2, options, _, parents) in \
704
840
            izip(self._data.read_records_iter_raw(copy_queue_records),
705
841
                 temp_version_list):
706
 
            assert version_id == version_id2, \
707
 
                'logic error, inconsistent results'
 
842
            if not (version_id == version_id2):
 
843
                raise AssertionError('logic error, inconsistent results')
708
844
            raw_datum.append(raw_data)
709
845
            result_version_list.append(
710
846
                (version_id, options, len(raw_data), parents))
717
853
                return pseudo_file.read(length)
718
854
        return (self.get_format_signature(), result_version_list, read)
719
855
 
 
856
    def get_record_stream(self, versions, ordering, include_delta_closure):
 
857
        """Get a stream of records for versions.
 
858
 
 
859
        :param versions: The versions to include. Each version is a tuple
 
860
            (version,).
 
861
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
862
            sorted stream has compression parents strictly before their
 
863
            children.
 
864
        :param include_delta_closure: If True then the closure across any
 
865
            compression parents will be included (in the opaque data).
 
866
        :return: An iterator of ContentFactory objects, each of which is only
 
867
            valid until the iterator is advanced.
 
868
        """
 
869
        if include_delta_closure:
 
870
            # Nb: what we should do is plan the data to stream to allow
 
871
            # reconstruction of all the texts without excessive buffering,
 
872
            # including re-sending common bases as needed. This makes the most
 
873
            # sense when we start serialising these streams though, so for now
 
874
            # we just fallback to individual text construction behind the
 
875
            # abstraction barrier.
 
876
            knit = self
 
877
        else:
 
878
            knit = None
 
879
        # We end up doing multiple index lookups here for parents details and
 
880
        # disk layout details - we need a unified api ?
 
881
        parent_map = self.get_parent_map(versions)
 
882
        absent_versions = set(versions) - set(parent_map)
 
883
        if ordering == 'topological':
 
884
            present_versions = topo_sort(parent_map)
 
885
        else:
 
886
            # List comprehension to keep the requested order (as that seems
 
887
            # marginally useful, at least until we start doing IO optimising
 
888
            # here.
 
889
            present_versions = [version for version in versions if version in
 
890
                parent_map]
 
891
        position_map = self._get_components_positions(present_versions)
 
892
        records = [(version, position_map[version][1]) for version in
 
893
            present_versions]
 
894
        record_map = {}
 
895
        for version in absent_versions:
 
896
            yield AbsentContentFactory((version,))
 
897
        for version, raw_data, sha1 in \
 
898
                self._data.read_records_iter_raw(records):
 
899
            (record_details, index_memo, _) = position_map[version]
 
900
            yield KnitContentFactory(version, parent_map[version],
 
901
                record_details, sha1, raw_data, self.factory.annotated, knit)
 
902
 
720
903
    def _extract_blocks(self, version_id, source, target):
721
904
        if self._index.get_method(version_id) != 'line-delta':
722
905
            return None
757
940
            annotated_part = "plain"
758
941
        return "knit-%s" % (annotated_part,)
759
942
        
760
 
    @deprecated_method(one_four)
761
 
    def get_graph_with_ghosts(self):
762
 
        """See VersionedFile.get_graph_with_ghosts()."""
763
 
        return self.get_parent_map(self.versions())
764
 
 
765
 
    def get_sha1(self, version_id):
766
 
        return self.get_sha1s([version_id])[0]
767
 
 
768
943
    def get_sha1s(self, version_ids):
769
 
        """See VersionedFile.get_sha1()."""
 
944
        """See VersionedFile.get_sha1s()."""
770
945
        record_map = self._get_record_map(version_ids)
771
946
        # record entry 2 is the 'digest'.
772
947
        return [record_map[v][2] for v in version_ids]
773
948
 
774
 
    @staticmethod
775
 
    def get_suffixes():
776
 
        """See VersionedFile.get_suffixes()."""
777
 
        return [DATA_SUFFIX, INDEX_SUFFIX]
778
 
 
779
 
    @deprecated_method(one_four)
780
 
    def has_ghost(self, version_id):
781
 
        """True if there is a ghost reference in the file to version_id."""
782
 
        # maybe we have it
783
 
        if self.has_version(version_id):
784
 
            return False
785
 
        # optimisable if needed by memoising the _ghosts set.
786
 
        items = self.get_parent_map(self.versions())
787
 
        for parents in items.itervalues():
788
 
            for parent in parents:
789
 
                if parent == version_id and parent not in items:
790
 
                    return True
791
 
        return False
792
 
 
793
949
    def insert_data_stream(self, (format, data_list, reader_callable)):
794
950
        """Insert knit records from a data stream into this knit.
795
951
 
806
962
                    'incompatible format signature inserting to %r', self)
807
963
            source = self._knit_from_datastream(
808
964
                (format, data_list, reader_callable))
809
 
            self.join(source)
 
965
            stream = source.get_record_stream(source.versions(), 'unordered', False)
 
966
            self.insert_record_stream(stream)
810
967
            return
811
968
 
812
969
        for version_id, options, length, parents in data_list:
824
981
                # Also check the SHA-1 of the fulltext this content will
825
982
                # produce.
826
983
                raw_data = reader_callable(length)
827
 
                my_fulltext_sha1 = self.get_sha1(version_id)
 
984
                my_fulltext_sha1 = self.get_sha1s([version_id])[0]
828
985
                df, rec = self._data._parse_record_header(version_id, raw_data)
829
986
                stream_fulltext_sha1 = rec[3]
830
987
                if my_fulltext_sha1 != stream_fulltext_sha1:
851
1008
                            'on the source repository, and "bzr reconcile" '
852
1009
                            'if necessary.' %
853
1010
                            (version_id, parents[0]))
 
1011
                    if not self.delta:
 
1012
                        # We received a line-delta record for a non-delta knit.
 
1013
                        # Convert it to a fulltext.
 
1014
                        gzip_bytes = reader_callable(length)
 
1015
                        self._convert_line_delta_to_fulltext(
 
1016
                            gzip_bytes, version_id, parents)
 
1017
                        continue
 
1018
 
854
1019
                self._add_raw_records(
855
1020
                    [(version_id, options, parents, length)],
856
1021
                    reader_callable(length))
857
1022
 
 
1023
    def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
 
1024
        lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
 
1025
        delta = self.factory.parse_line_delta(lines, version_id)
 
1026
        content = self.factory.make(self.get_lines(parents[0]), parents[0])
 
1027
        content.apply_delta(delta, version_id)
 
1028
        digest, len, content = self.add_lines(
 
1029
            version_id, parents, content.text())
 
1030
        if digest != sha1:
 
1031
            raise errors.VersionedFileInvalidChecksum(version_id)
 
1032
 
858
1033
    def _knit_from_datastream(self, (format, data_list, reader_callable)):
859
1034
        """Create a knit object from a data stream.
860
1035
 
877
1052
        return KnitVersionedFile(self.filename, self.transport,
878
1053
            factory=factory, index=index, access_method=access)
879
1054
 
 
1055
    def insert_record_stream(self, stream):
 
1056
        """Insert a record stream into this versioned file.
 
1057
 
 
1058
        :param stream: A stream of records to insert. 
 
1059
        :return: None
 
1060
        :seealso VersionedFile.get_record_stream:
 
1061
        """
 
1062
        def get_adapter(adapter_key):
 
1063
            try:
 
1064
                return adapters[adapter_key]
 
1065
            except KeyError:
 
1066
                adapter_factory = adapter_registry.get(adapter_key)
 
1067
                adapter = adapter_factory(self)
 
1068
                adapters[adapter_key] = adapter
 
1069
                return adapter
 
1070
        if self.factory.annotated:
 
1071
            # self is annotated, we need annotated knits to use directly.
 
1072
            annotated = "annotated-"
 
1073
            convertibles = []
 
1074
        else:
 
1075
            # self is not annotated, but we can strip annotations cheaply.
 
1076
            annotated = ""
 
1077
            convertibles = set(["knit-annotated-delta-gz",
 
1078
                "knit-annotated-ft-gz"])
 
1079
        # The set of types we can cheaply adapt without needing basis texts.
 
1080
        native_types = set()
 
1081
        native_types.add("knit-%sdelta-gz" % annotated)
 
1082
        native_types.add("knit-%sft-gz" % annotated)
 
1083
        knit_types = native_types.union(convertibles)
 
1084
        adapters = {}
 
1085
        # Buffer all index entries that we can't add immediately because their
 
1086
        # basis parent is missing. We don't buffer all because generating
 
1087
        # annotations may require access to some of the new records. However we
 
1088
        # can't generate annotations from new deltas until their basis parent
 
1089
        # is present anyway, so we get away with not needing an index that
 
1090
        # includes the new keys.
 
1091
        # key = basis_parent, value = index entry to add
 
1092
        buffered_index_entries = {}
 
1093
        for record in stream:
 
1094
            # Raise an error when a record is missing.
 
1095
            if record.storage_kind == 'absent':
 
1096
                raise RevisionNotPresent([record.key[0]], self)
 
1097
            # adapt to non-tuple interface
 
1098
            parents = [parent[0] for parent in record.parents]
 
1099
            if record.storage_kind in knit_types:
 
1100
                if record.storage_kind not in native_types:
 
1101
                    try:
 
1102
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1103
                        adapter = get_adapter(adapter_key)
 
1104
                    except KeyError:
 
1105
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1106
                        adapter = get_adapter(adapter_key)
 
1107
                    bytes = adapter.get_bytes(
 
1108
                        record, record.get_bytes_as(record.storage_kind))
 
1109
                else:
 
1110
                    bytes = record.get_bytes_as(record.storage_kind)
 
1111
                options = [record._build_details[0]]
 
1112
                if record._build_details[1]:
 
1113
                    options.append('no-eol')
 
1114
                # Just blat it across.
 
1115
                # Note: This does end up adding data on duplicate keys. As
 
1116
                # modern repositories use atomic insertions this should not
 
1117
                # lead to excessive growth in the event of interrupted fetches.
 
1118
                # 'knit' repositories may suffer excessive growth, but as a
 
1119
                # deprecated format this is tolerable. It can be fixed if
 
1120
                # needed by in the kndx index support raising on a duplicate
 
1121
                # add with identical parents and options.
 
1122
                access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
 
1123
                index_entry = (record.key[0], options, access_memo, parents)
 
1124
                buffered = False
 
1125
                if 'fulltext' not in options:
 
1126
                    basis_parent = parents[0]
 
1127
                    if not self.has_version(basis_parent):
 
1128
                        pending = buffered_index_entries.setdefault(
 
1129
                            basis_parent, [])
 
1130
                        pending.append(index_entry)
 
1131
                        buffered = True
 
1132
                if not buffered:
 
1133
                    self._index.add_versions([index_entry])
 
1134
            elif record.storage_kind == 'fulltext':
 
1135
                self.add_lines(record.key[0], parents,
 
1136
                    split_lines(record.get_bytes_as('fulltext')))
 
1137
            else:
 
1138
                adapter_key = record.storage_kind, 'fulltext'
 
1139
                adapter = get_adapter(adapter_key)
 
1140
                lines = split_lines(adapter.get_bytes(
 
1141
                    record, record.get_bytes_as(record.storage_kind)))
 
1142
                try:
 
1143
                    self.add_lines(record.key[0], parents, lines)
 
1144
                except errors.RevisionAlreadyPresent:
 
1145
                    pass
 
1146
            # Add any records whose basis parent is now available.
 
1147
            added_keys = [record.key[0]]
 
1148
            while added_keys:
 
1149
                key = added_keys.pop(0)
 
1150
                if key in buffered_index_entries:
 
1151
                    index_entries = buffered_index_entries[key]
 
1152
                    self._index.add_versions(index_entries)
 
1153
                    added_keys.extend(
 
1154
                        [index_entry[0] for index_entry in index_entries])
 
1155
                    del buffered_index_entries[key]
 
1156
        # If there were any deltas which had a missing basis parent, error.
 
1157
        if buffered_index_entries:
 
1158
            raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
 
1159
                self)
 
1160
 
880
1161
    def versions(self):
881
1162
        """See VersionedFile.versions."""
882
1163
        if 'evil' in debug.debug_flags:
1057
1338
            # I/O and the time spend applying deltas.
1058
1339
            delta = self._check_should_delta(present_parents)
1059
1340
 
1060
 
        assert isinstance(version_id, str)
1061
1341
        content = self.factory.make(lines, version_id)
1062
1342
        if delta or (self.factory.annotated and len(present_parents) > 0):
1063
1343
            # Merge annotations from parent texts if needed.
1093
1373
 
1094
1374
    def check(self, progress_bar=None):
1095
1375
        """See VersionedFile.check()."""
1096
 
 
1097
 
    def _clone_text(self, new_version_id, old_version_id, parents):
1098
 
        """See VersionedFile.clone_text()."""
1099
 
        # FIXME RBC 20060228 make fast by only inserting an index with null 
1100
 
        # delta.
1101
 
        self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
 
1376
        # This doesn't actually test extraction of everything, but that will
 
1377
        # impact 'bzr check' substantially, and needs to be integrated with
 
1378
        # care. However, it does check for the obvious problem of a delta with
 
1379
        # no basis.
 
1380
        versions = self.versions()
 
1381
        parent_map = self.get_parent_map(versions)
 
1382
        for version in versions:
 
1383
            if self._index.get_method(version) != 'fulltext':
 
1384
                compression_parent = parent_map[version][0]
 
1385
                if compression_parent not in parent_map:
 
1386
                    raise errors.KnitCorrupt(self,
 
1387
                        "Missing basis parent %s for %s" % (
 
1388
                        compression_parent, version))
1102
1389
 
1103
1390
    def get_lines(self, version_id):
1104
1391
        """See VersionedFile.get_lines()."""
1229
1516
            enumerate(self._data.read_records_iter(version_id_records)):
1230
1517
            pb.update('Walking content.', version_idx, total)
1231
1518
            method = self._index.get_method(version_id)
1232
 
 
1233
 
            assert method in ('fulltext', 'line-delta')
1234
1519
            if method == 'fulltext':
1235
1520
                line_iterator = self.factory.get_fulltext_content(data)
1236
 
            else:
 
1521
            elif method == 'line-delta':
1237
1522
                line_iterator = self.factory.get_linedelta_content(data)
 
1523
            else:
 
1524
                raise ValueError('invalid method %r' % (method,))
1238
1525
            # XXX: It might be more efficient to yield (version_id,
1239
1526
            # line_iterator) in the future. However for now, this is a simpler
1240
1527
            # change to integrate into the rest of the codebase. RBC 20071110
1243
1530
 
1244
1531
        pb.update('Walking content.', total, total)
1245
1532
        
1246
 
    def iter_parents(self, version_ids):
1247
 
        """Iterate through the parents for many version ids.
1248
 
 
1249
 
        :param version_ids: An iterable yielding version_ids.
1250
 
        :return: An iterator that yields (version_id, parents). Requested 
1251
 
            version_ids not present in the versioned file are simply skipped.
1252
 
            The order is undefined, allowing for different optimisations in
1253
 
            the underlying implementation.
1254
 
        """
1255
 
        return self._index.iter_parents(version_ids)
1256
 
 
1257
1533
    def num_versions(self):
1258
1534
        """See VersionedFile.num_versions()."""
1259
1535
        return self._index.num_versions()
1260
1536
 
1261
1537
    __len__ = num_versions
1262
1538
 
1263
 
    def annotate_iter(self, version_id):
1264
 
        """See VersionedFile.annotate_iter."""
1265
 
        return self.factory.annotate_iter(self, version_id)
 
1539
    def annotate(self, version_id):
 
1540
        """See VersionedFile.annotate."""
 
1541
        return self.factory.annotate(self, version_id)
1266
1542
 
1267
1543
    def get_parent_map(self, version_ids):
1268
1544
        """See VersionedFile.get_parent_map."""
1404
1680
                                   parents,
1405
1681
                                   index)
1406
1682
 
 
1683
    def _check_write_ok(self):
 
1684
        if self._get_scope() != self._scope:
 
1685
            raise errors.OutSideTransaction()
 
1686
        if self._mode != 'w':
 
1687
            raise errors.ReadOnlyObjectDirtiedError(self)
 
1688
 
1407
1689
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
1408
 
                 create_parent_dir=False, delay_create=False, dir_mode=None):
 
1690
        create_parent_dir=False, delay_create=False, dir_mode=None,
 
1691
        get_scope=None):
1409
1692
        _KnitComponentFile.__init__(self, transport, filename, mode,
1410
1693
                                    file_mode=file_mode,
1411
1694
                                    create_parent_dir=create_parent_dir,
1432
1715
            else:
1433
1716
                self._transport.put_bytes_non_atomic(
1434
1717
                    self._filename, self.HEADER, mode=self._file_mode)
 
1718
        self._scope = get_scope()
 
1719
        self._get_scope = get_scope
1435
1720
 
1436
1721
    def get_ancestry(self, versions, topo_sorted=True):
1437
1722
        """See VersionedFile.get_ancestry."""
1509
1794
                                  parents, (method, noeol))
1510
1795
        return result
1511
1796
 
1512
 
    def iter_parents(self, version_ids):
1513
 
        """Iterate through the parents for many version ids.
1514
 
 
1515
 
        :param version_ids: An iterable yielding version_ids.
1516
 
        :return: An iterator that yields (version_id, parents). Requested 
1517
 
            version_ids not present in the versioned file are simply skipped.
1518
 
            The order is undefined, allowing for different optimisations in
1519
 
            the underlying implementation.
1520
 
        """
1521
 
        parent_map = self.get_parent_map(version_ids)
1522
 
        parent_map_set = set(parent_map)
1523
 
        unknown_existence = set()
1524
 
        for parents in parent_map.itervalues():
1525
 
            unknown_existence.update(parents)
1526
 
        unknown_existence.difference_update(parent_map_set)
1527
 
        present_parents = set(self.get_parent_map(unknown_existence))
1528
 
        present_parents.update(parent_map_set)
1529
 
        for version_id, parents in parent_map.iteritems():
1530
 
            parents = tuple(parent for parent in parents
1531
 
                if parent in present_parents)
1532
 
            yield version_id, parents
1533
 
 
1534
1797
    def num_versions(self):
1535
1798
        return len(self._history)
1536
1799
 
1575
1838
                                               pos,
1576
1839
                                               size,
1577
1840
                                               self._version_list_to_index(parents))
1578
 
                assert isinstance(line, str), \
1579
 
                    'content must be utf-8 encoded: %r' % (line,)
1580
1841
                lines.append(line)
1581
1842
                self._cache_version(version_id, options, pos, size, tuple(parents))
1582
1843
            if not self._need_to_create:
1680
1941
            raise KnitCorrupt(self, "Cannot do delta compression without "
1681
1942
                "parent tracking.")
1682
1943
 
 
1944
    def _check_write_ok(self):
 
1945
        pass
 
1946
 
1683
1947
    def _get_entries(self, keys, check_present=False):
1684
1948
        """Get the entries for keys.
1685
1949
        
1839
2103
        compression_parents = an_entry[3][1]
1840
2104
        if not compression_parents:
1841
2105
            return None
1842
 
        assert len(compression_parents) == 1
1843
2106
        return compression_parents[0]
1844
2107
 
1845
2108
    def _get_method(self, node):
1850
2113
        else:
1851
2114
            return 'fulltext'
1852
2115
 
1853
 
    def iter_parents(self, version_ids):
1854
 
        """Iterate through the parents for many version ids.
1855
 
 
1856
 
        :param version_ids: An iterable yielding version_ids.
1857
 
        :return: An iterator that yields (version_id, parents). Requested 
1858
 
            version_ids not present in the versioned file are simply skipped.
1859
 
            The order is undefined, allowing for different optimisations in
1860
 
            the underlying implementation.
1861
 
        """
1862
 
        if self._parents:
1863
 
            all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1864
 
            all_parents = set()
1865
 
            present_parents = set()
1866
 
            for node in all_nodes:
1867
 
                all_parents.update(node[3][0])
1868
 
                # any node we are querying must be present
1869
 
                present_parents.add(node[1])
1870
 
            unknown_parents = all_parents.difference(present_parents)
1871
 
            present_parents.update(self._present_keys(unknown_parents))
1872
 
            for node in all_nodes:
1873
 
                parents = []
1874
 
                for parent in node[3][0]:
1875
 
                    if parent in present_parents:
1876
 
                        parents.append(parent[0])
1877
 
                yield node[1][0], tuple(parents)
1878
 
        else:
1879
 
            for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1880
 
                yield node[1][0], ()
1881
 
 
1882
2116
    def num_versions(self):
1883
2117
        return len(list(self._graph_index.iter_all_entries()))
1884
2118
 
2055
2289
            tuple - (index, pos, length), where the index field is always None
2056
2290
            for the .knit access method.
2057
2291
        """
2058
 
        assert type(raw_data) == str, \
2059
 
            'data must be plain bytes was %s' % type(raw_data)
2060
2292
        if not self._need_to_create:
2061
2293
            base = self._transport.append_bytes(self._filename, raw_data)
2062
2294
        else:
2139
2371
            tuple - (index, pos, length), where the index field is the 
2140
2372
            write_index object supplied to the PackAccess object.
2141
2373
        """
2142
 
        assert type(raw_data) == str, \
2143
 
            'data must be plain bytes was %s' % type(raw_data)
2144
2374
        result = []
2145
2375
        offset = 0
2146
2376
        for size in sizes:
2229
2459
    def get_raw_records(self, memos_for_retrieval):
2230
2460
        """Get the raw bytes for a records.
2231
2461
 
2232
 
        :param memos_for_retrieval: An iterable containing the (thunk_flag,
2233
 
            index, start, end) memo for retrieving the bytes.
2234
 
        :return: An iterator over the bytes of the records.
 
2462
        :param memos_for_retrieval: An iterable of memos from the
 
2463
            _StreamIndex object identifying bytes to read; for these classes
 
2464
            they are (from_backing_knit, index, start, end) and can point to
 
2465
            either the backing knit or streamed data.
 
2466
        :return: An iterator yielding a byte string for each record in 
 
2467
            memos_for_retrieval.
2235
2468
        """
2236
2469
        # use a generator for memory friendliness
2237
 
        for thunk_flag, version_id, start, end in memos_for_retrieval:
2238
 
            if version_id is self.stream_index:
 
2470
        for from_backing_knit, version_id, start, end in memos_for_retrieval:
 
2471
            if not from_backing_knit:
 
2472
                if version_id is not self.stream_index:
 
2473
                    raise AssertionError()
2239
2474
                yield self.data[start:end]
2240
2475
                continue
2241
2476
            # we have been asked to thunk. This thunking only occurs when
2246
2481
            # as desired. However, for now, this is sufficient.
2247
2482
            if self.orig_factory.__class__ != KnitPlainFactory:
2248
2483
                raise errors.KnitCorrupt(
2249
 
                    self, 'Bad thunk request %r' % version_id)
 
2484
                    self, 'Bad thunk request %r cannot be backed by %r' %
 
2485
                        (version_id, self.orig_factory))
2250
2486
            lines = self.backing_knit.get_lines(version_id)
2251
2487
            line_bytes = ''.join(lines)
2252
2488
            digest = sha_string(line_bytes)
 
2489
            # the packed form of the fulltext always has a trailing newline,
 
2490
            # even if the actual text does not, unless the file is empty.  the
 
2491
            # record options including the noeol flag are passed through by
 
2492
            # _StreamIndex, so this is safe.
2253
2493
            if lines:
2254
2494
                if lines[-1][-1] != '\n':
2255
2495
                    lines[-1] = lines[-1] + '\n'
2256
2496
                    line_bytes += '\n'
2257
 
            orig_options = list(self.backing_knit._index.get_options(version_id))
2258
 
            if 'fulltext' not in orig_options:
2259
 
                if 'line-delta' not in orig_options:
2260
 
                    raise errors.KnitCorrupt(self,
2261
 
                        'Unknown compression method %r' % orig_options)
2262
 
                orig_options.remove('line-delta')
2263
 
                orig_options.append('fulltext')
2264
2497
            # We want plain data, because we expect to thunk only to allow text
2265
2498
            # extraction.
2266
2499
            size, bytes = self.backing_knit._data._record_to_data(version_id,
2318
2551
        :return: A dict of version_id:(index_memo, compression_parent,
2319
2552
                                       parents, record_details).
2320
2553
            index_memo
2321
 
                opaque structure to pass to read_records to extract the raw
2322
 
                data
 
2554
                opaque memo that can be passed to _StreamAccess.read_records
 
2555
                to extract the raw data; for these classes it is
 
2556
                (from_backing_knit, index, start, end) 
2323
2557
            compression_parent
2324
2558
                Content that this record is built upon, may be None
2325
2559
            parents
2337
2571
                continue
2338
2572
            parent_ids = self.get_parents_with_ghosts(version_id)
2339
2573
            noeol = ('no-eol' in self.get_options(version_id))
 
2574
            index_memo = self.get_position(version_id)
 
2575
            from_backing_knit = index_memo[0]
 
2576
            if from_backing_knit:
 
2577
                # texts retrieved from the backing knit are always full texts
 
2578
                method = 'fulltext'
2340
2579
            if method == 'fulltext':
2341
2580
                compression_parent = None
2342
2581
            else:
2343
2582
                compression_parent = parent_ids[0]
2344
 
            index_memo = self.get_position(version_id)
2345
2583
            result[version_id] = (index_memo, compression_parent,
2346
2584
                                  parent_ids, (method, noeol))
2347
2585
        return result
2348
2586
 
2349
2587
    def get_method(self, version_id):
2350
2588
        """Return compression method of specified version."""
2351
 
        try:
2352
 
            options = self._by_version[version_id][0]
2353
 
        except KeyError:
2354
 
            # Strictly speaking this should check in the backing knit, but
2355
 
            # until we have a test to discriminate, this will do.
2356
 
            return self.backing_index.get_method(version_id)
 
2589
        options = self.get_options(version_id)
2357
2590
        if 'fulltext' in options:
2358
2591
            return 'fulltext'
2359
2592
        elif 'line-delta' in options:
2369
2602
        try:
2370
2603
            return self._by_version[version_id][0]
2371
2604
        except KeyError:
2372
 
            return self.backing_index.get_options(version_id)
 
2605
            options = list(self.backing_index.get_options(version_id))
 
2606
            if 'fulltext' in options:
 
2607
                pass
 
2608
            elif 'line-delta' in options:
 
2609
                # Texts from the backing knit are always returned from the stream
 
2610
                # as full texts
 
2611
                options.remove('line-delta')
 
2612
                options.append('fulltext')
 
2613
            else:
 
2614
                raise errors.KnitIndexUnknownMethod(self, options)
 
2615
            return tuple(options)
2373
2616
 
2374
2617
    def get_parent_map(self, version_ids):
2375
2618
        """Passed through to by KnitVersionedFile.get_parent_map."""
2397
2640
        coordinates into that (as index_memo's are opaque outside the
2398
2641
        index and matching access class).
2399
2642
 
2400
 
        :return: a tuple (thunk_flag, index, start, end).  If thunk_flag is
2401
 
            False, index will be self, otherwise it will be a version id.
 
2643
        :return: a tuple (from_backing_knit, index, start, end) that can 
 
2644
            be passed e.g. to get_raw_records.  
 
2645
            If from_backing_knit is False, index will be self, otherwise it
 
2646
            will be a version id.
2402
2647
        """
2403
2648
        try:
2404
2649
            start, end = self._by_version[version_id][1]
2411
2656
        """Get all the versions in the stream."""
2412
2657
        return self._by_version.keys()
2413
2658
 
2414
 
    def iter_parents(self, version_ids):
2415
 
        """Iterate through the parents for many version ids.
2416
 
 
2417
 
        :param version_ids: An iterable yielding version_ids.
2418
 
        :return: An iterator that yields (version_id, parents). Requested 
2419
 
            version_ids not present in the versioned file are simply skipped.
2420
 
            The order is undefined, allowing for different optimisations in
2421
 
            the underlying implementation.
2422
 
        """
2423
 
        result = []
2424
 
        for version in version_ids:
2425
 
            try:
2426
 
                result.append((version, self._by_version[version][2]))
2427
 
            except KeyError:
2428
 
                pass
2429
 
        return result
2430
 
 
2431
2659
 
2432
2660
class _KnitData(object):
2433
2661
    """Manage extraction of data from a KnitAccess, caching and decompressing.
2445
2673
        """
2446
2674
        self._access = access
2447
2675
        self._checked = False
2448
 
        # TODO: jam 20060713 conceptually, this could spill to disk
2449
 
        #       if the cached size gets larger than a certain amount
2450
 
        #       but it complicates the model a bit, so for now just use
2451
 
        #       a simple dictionary
2452
 
        self._cache = {}
2453
 
        self._do_cache = False
2454
 
 
2455
 
    def enable_cache(self):
2456
 
        """Enable caching of reads."""
2457
 
        self._do_cache = True
2458
 
 
2459
 
    def clear_cache(self):
2460
 
        """Clear the record cache."""
2461
 
        self._do_cache = False
2462
 
        self._cache = {}
2463
2676
 
2464
2677
    def _open_file(self):
2465
2678
        return self._access.open_file()
2484
2697
                                     digest)],
2485
2698
            dense_lines or lines,
2486
2699
            ["end %s\n" % version_id]))
2487
 
        assert bytes.__class__ == str
2488
2700
        compressed_bytes = bytes_to_gzip(bytes)
2489
2701
        return len(compressed_bytes), compressed_bytes
2490
2702
 
2514
2726
                              % (version_id, e.__class__.__name__, str(e)))
2515
2727
        return df, rec
2516
2728
 
2517
 
    def _check_header(self, version_id, line):
 
2729
    def _split_header(self, line):
2518
2730
        rec = line.split()
2519
2731
        if len(rec) != 4:
2520
2732
            raise KnitCorrupt(self._access,
2521
2733
                              'unexpected number of elements in record header')
 
2734
        return rec
 
2735
 
 
2736
    def _check_header_version(self, rec, version_id):
2522
2737
        if rec[1] != version_id:
2523
2738
            raise KnitCorrupt(self._access,
2524
2739
                              'unexpected version, wanted %r, got %r'
2525
2740
                              % (version_id, rec[1]))
 
2741
 
 
2742
    def _check_header(self, version_id, line):
 
2743
        rec = self._split_header(line)
 
2744
        self._check_header_version(rec, version_id)
2526
2745
        return rec
2527
2746
 
2528
 
    def _parse_record(self, version_id, data):
 
2747
    def _parse_record_unchecked(self, data):
2529
2748
        # profiling notes:
2530
2749
        # 4168 calls in 2880 217 internal
2531
2750
        # 4168 calls to _parse_record_header in 2121
2532
2751
        # 4168 calls to readlines in 330
2533
2752
        df = GzipFile(mode='rb', fileobj=StringIO(data))
2534
 
 
2535
2753
        try:
2536
2754
            record_contents = df.readlines()
2537
2755
        except Exception, e:
2538
 
            raise KnitCorrupt(self._access,
2539
 
                              "While reading {%s} got %s(%s)"
2540
 
                              % (version_id, e.__class__.__name__, str(e)))
 
2756
            raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
 
2757
                (data, e.__class__.__name__, str(e)))
2541
2758
        header = record_contents.pop(0)
2542
 
        rec = self._check_header(version_id, header)
2543
 
 
 
2759
        rec = self._split_header(header)
2544
2760
        last_line = record_contents.pop()
2545
2761
        if len(record_contents) != int(rec[2]):
2546
2762
            raise KnitCorrupt(self._access,
2547
2763
                              'incorrect number of lines %s != %s'
2548
2764
                              ' for version {%s}'
2549
2765
                              % (len(record_contents), int(rec[2]),
2550
 
                                 version_id))
 
2766
                                 rec[1]))
2551
2767
        if last_line != 'end %s\n' % rec[1]:
2552
2768
            raise KnitCorrupt(self._access,
2553
2769
                              'unexpected version end line %r, wanted %r' 
2554
 
                              % (last_line, version_id))
 
2770
                              % (last_line, rec[1]))
2555
2771
        df.close()
 
2772
        return rec, record_contents
 
2773
 
 
2774
    def _parse_record(self, version_id, data):
 
2775
        rec, record_contents = self._parse_record_unchecked(data)
 
2776
        self._check_header_version(rec, version_id)
2556
2777
        return record_contents, rec[3]
2557
2778
 
2558
2779
    def read_records_iter_raw(self, records):
2560
2781
 
2561
2782
        This unpacks enough of the text record to validate the id is
2562
2783
        as expected but thats all.
 
2784
 
 
2785
        Each item the iterator yields is (version_id, bytes,
 
2786
        sha1_of_full_text).
2563
2787
        """
2564
2788
        # setup an iterator of the external records:
2565
2789
        # uses readv so nice and fast we hope.
2566
2790
        if len(records):
2567
2791
            # grab the disk data needed.
2568
 
            if self._cache:
2569
 
                # Don't check _cache if it is empty
2570
 
                needed_offsets = [index_memo for version_id, index_memo
2571
 
                                              in records
2572
 
                                              if version_id not in self._cache]
2573
 
            else:
2574
 
                needed_offsets = [index_memo for version_id, index_memo
2575
 
                                               in records]
2576
 
 
 
2792
            needed_offsets = [index_memo for version_id, index_memo
 
2793
                                           in records]
2577
2794
            raw_records = self._access.get_raw_records(needed_offsets)
2578
2795
 
2579
2796
        for version_id, index_memo in records:
2580
 
            if version_id in self._cache:
2581
 
                # This data has already been validated
2582
 
                data = self._cache[version_id]
2583
 
            else:
2584
 
                data = raw_records.next()
2585
 
                if self._do_cache:
2586
 
                    self._cache[version_id] = data
2587
 
 
2588
 
                # validate the header
2589
 
                df, rec = self._parse_record_header(version_id, data)
2590
 
                df.close()
2591
 
            yield version_id, data
 
2797
            data = raw_records.next()
 
2798
            # validate the header
 
2799
            df, rec = self._parse_record_header(version_id, data)
 
2800
            df.close()
 
2801
            yield version_id, data, rec[3]
2592
2802
 
2593
2803
    def read_records_iter(self, records):
2594
2804
        """Read text records from data file and yield result.
2603
2813
        if not records:
2604
2814
            return
2605
2815
 
2606
 
        if self._cache:
2607
 
            # Skip records we have alread seen
2608
 
            yielded_records = set()
2609
 
            needed_records = set()
2610
 
            for record in records:
2611
 
                if record[0] in self._cache:
2612
 
                    if record[0] in yielded_records:
2613
 
                        continue
2614
 
                    yielded_records.add(record[0])
2615
 
                    data = self._cache[record[0]]
2616
 
                    content, digest = self._parse_record(record[0], data)
2617
 
                    yield (record[0], content, digest)
2618
 
                else:
2619
 
                    needed_records.add(record)
2620
 
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
2621
 
        else:
2622
 
            needed_records = sorted(set(records), key=operator.itemgetter(1))
2623
 
 
 
2816
        needed_records = sorted(set(records), key=operator.itemgetter(1))
2624
2817
        if not needed_records:
2625
2818
            return
2626
2819
 
2632
2825
        for (version_id, index_memo), data in \
2633
2826
                izip(iter(needed_records), raw_data):
2634
2827
            content, digest = self._parse_record(version_id, data)
2635
 
            if self._do_cache:
2636
 
                self._cache[version_id] = data
2637
2828
            yield version_id, content, digest
2638
2829
 
2639
2830
    def read_records(self, records):
2648
2839
class InterKnit(InterVersionedFile):
2649
2840
    """Optimised code paths for knit to knit operations."""
2650
2841
    
2651
 
    _matching_file_from_factory = KnitVersionedFile
2652
 
    _matching_file_to_factory = KnitVersionedFile
 
2842
    _matching_file_from_factory = staticmethod(make_file_knit)
 
2843
    _matching_file_to_factory = staticmethod(make_file_knit)
2653
2844
    
2654
2845
    @staticmethod
2655
2846
    def is_compatible(source, target):
2705
2896
 
2706
2897
    def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2707
2898
        """See InterVersionedFile.join."""
2708
 
        assert isinstance(self.source, KnitVersionedFile)
2709
 
        assert isinstance(self.target, KnitVersionedFile)
2710
 
 
2711
2899
        # If the source and target are mismatched w.r.t. annotations vs
2712
2900
        # plain, the data needs to be converted accordingly
2713
2901
        if self.source.factory.annotated == self.target.factory.annotated:
2759
2947
                    # * already have it or
2760
2948
                    # * have it scheduled already
2761
2949
                    # otherwise we don't care
2762
 
                    assert (self.target.has_version(parent) or
 
2950
                    if not (self.target.has_version(parent) or
2763
2951
                            parent in copy_set or
2764
 
                            not self.source.has_version(parent))
 
2952
                            not self.source.has_version(parent)):
 
2953
                        raise AssertionError("problem joining parent %r "
 
2954
                            "from %r to %r"
 
2955
                            % (parent, self.source, self.target))
2765
2956
                index_memo = self.source._index.get_position(version_id)
2766
2957
                copy_queue_records.append((version_id, index_memo))
2767
2958
                copy_queue.append((version_id, options, parents))
2772
2963
            total = len(version_list)
2773
2964
            raw_datum = []
2774
2965
            raw_records = []
2775
 
            for (version_id, raw_data), \
 
2966
            for (version_id, raw_data, _), \
2776
2967
                (version_id2, options, parents) in \
2777
2968
                izip(self.source._data.read_records_iter_raw(copy_queue_records),
2778
2969
                     copy_queue):
2779
 
                assert version_id == version_id2, 'logic error, inconsistent results'
 
2970
                if not (version_id == version_id2):
 
2971
                    raise AssertionError('logic error, inconsistent results')
2780
2972
                count = count + 1
2781
2973
                pb.update("Joining knit", count, total)
2782
2974
                if converter:
2812
3004
    """Optimised code paths for weave to knit operations."""
2813
3005
    
2814
3006
    _matching_file_from_factory = bzrlib.weave.WeaveFile
2815
 
    _matching_file_to_factory = KnitVersionedFile
 
3007
    _matching_file_to_factory = staticmethod(make_file_knit)
2816
3008
    
2817
3009
    @staticmethod
2818
3010
    def is_compatible(source, target):
2825
3017
 
2826
3018
    def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2827
3019
        """See InterVersionedFile.join."""
2828
 
        assert isinstance(self.source, bzrlib.weave.Weave)
2829
 
        assert isinstance(self.target, KnitVersionedFile)
2830
 
 
2831
3020
        version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2832
3021
 
2833
3022
        if not version_ids:
2859
3048
                # check that its will be a consistent copy:
2860
3049
                for parent in parents:
2861
3050
                    # if source has the parent, we must already have it
2862
 
                    assert (self.target.has_version(parent))
 
3051
                    if not self.target.has_version(parent):
 
3052
                        raise AssertionError("%r does not have parent %r"
 
3053
                            % (self.target, parent))
2863
3054
                self.target.add_lines(
2864
3055
                    version_id, parents, self.source.get_lines(version_id))
2865
3056
                count = count + 1
3035
3226
                # add a key, no parents
3036
3227
                self._revision_id_graph[missing_version] = ()
3037
3228
                pending.discard(missing_version) # don't look for it
3038
 
        # XXX: This should probably be a real exception, as it is a data
3039
 
        #      inconsistency
3040
 
        assert not self._ghosts.intersection(self._compression_children), \
3041
 
            "We cannot have nodes which have a compression parent of a ghost."
 
3229
        if self._ghosts.intersection(self._compression_children):
 
3230
            raise KnitCorrupt(
 
3231
                "We cannot have nodes which have a ghost compression parent:\n"
 
3232
                "ghosts: %r\n"
 
3233
                "compression children: %r"
 
3234
                % (self._ghosts, self._compression_children))
3042
3235
        # Cleanout anything that depends on a ghost so that we don't wait for
3043
3236
        # the ghost to show up
3044
3237
        for node in self._ghosts:
3072
3265
            if len(parent_ids) == 0:
3073
3266
                # There are no parents for this node, so just add it
3074
3267
                # TODO: This probably needs to be decoupled
3075
 
                assert compression_parent is None
3076
3268
                fulltext_content, delta = self._knit.factory.parse_record(
3077
3269
                    rev_id, record, record_details, None)
3078
3270
                fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3089
3281
                 record_details) = self._all_build_details[rev_id]
3090
3282
                if compression_parent is not None:
3091
3283
                    comp_children = self._compression_children[compression_parent]
3092
 
                    assert rev_id in comp_children
 
3284
                    if rev_id not in comp_children:
 
3285
                        raise AssertionError("%r not in compression children %r"
 
3286
                            % (rev_id, comp_children))
3093
3287
                    # If there is only 1 child, it is safe to reuse this
3094
3288
                    # content
3095
3289
                    reuse_content = (len(comp_children) == 1