/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: John Arbash Meinel
  • Date: 2008-06-05 16:27:16 UTC
  • mfrom: (3475 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3476.
  • Revision ID: john@arbash-meinel.com-20080605162716-a3hn238tnctbfd8j
merge bzr.dev, resolve NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
60
60
# record content length ?
61
61
                  
62
62
 
63
 
from copy import copy
64
63
from cStringIO import StringIO
65
64
from itertools import izip, chain
66
65
import operator
74
73
lazy_import(globals(), """
75
74
from bzrlib import (
76
75
    annotate,
 
76
    graph as _mod_graph,
77
77
    lru_cache,
78
78
    pack,
79
79
    trace,
100
100
    RevisionNotPresent,
101
101
    RevisionAlreadyPresent,
102
102
    )
103
 
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
 
103
from bzrlib.graph import Graph
104
104
from bzrlib.osutils import (
105
105
    contains_whitespace,
106
106
    contains_linebreaks,
107
107
    sha_string,
108
108
    sha_strings,
 
109
    split_lines,
109
110
    )
110
 
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
111
111
from bzrlib.tsort import topo_sort
 
112
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
112
113
import bzrlib.ui
 
114
from bzrlib.versionedfile import (
 
115
    AbsentContentFactory,
 
116
    adapter_registry,
 
117
    ContentFactory,
 
118
    InterVersionedFile,
 
119
    VersionedFile,
 
120
    )
113
121
import bzrlib.weave
114
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
115
122
 
116
123
 
117
124
# TODO: Split out code specific to this format into an associated object.
131
138
INDEX_SUFFIX = '.kndx'
132
139
 
133
140
 
 
141
class KnitAdapter(object):
 
142
    """Base class for knit record adaption."""
 
143
 
 
144
    def __init__(self, basis_vf):
 
145
        """Create an adapter which accesses full texts from basis_vf.
 
146
        
 
147
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
148
            May be None for adapters that do not need to access basis texts.
 
149
        """
 
150
        self._data = _KnitData(None)
 
151
        self._annotate_factory = KnitAnnotateFactory()
 
152
        self._plain_factory = KnitPlainFactory()
 
153
        self._basis_vf = basis_vf
 
154
 
 
155
 
 
156
class FTAnnotatedToUnannotated(KnitAdapter):
 
157
    """An adapter from FT annotated knits to unannotated ones."""
 
158
 
 
159
    def get_bytes(self, factory, annotated_compressed_bytes):
 
160
        rec, contents = \
 
161
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
162
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
163
        size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
 
164
        return bytes
 
165
 
 
166
 
 
167
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
168
    """An adapter for deltas from annotated to unannotated."""
 
169
 
 
170
    def get_bytes(self, factory, annotated_compressed_bytes):
 
171
        rec, contents = \
 
172
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
173
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
174
            plain=True)
 
175
        contents = self._plain_factory.lower_line_delta(delta)
 
176
        size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
 
177
        return bytes
 
178
 
 
179
 
 
180
class FTAnnotatedToFullText(KnitAdapter):
 
181
    """An adapter from FT annotated knits to unannotated ones."""
 
182
 
 
183
    def get_bytes(self, factory, annotated_compressed_bytes):
 
184
        rec, contents = \
 
185
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
186
        content, delta = self._annotate_factory.parse_record(factory.key[0],
 
187
            contents, factory._build_details, None)
 
188
        return ''.join(content.text())
 
189
 
 
190
 
 
191
class DeltaAnnotatedToFullText(KnitAdapter):
 
192
    """An adapter for deltas from annotated to unannotated."""
 
193
 
 
194
    def get_bytes(self, factory, annotated_compressed_bytes):
 
195
        rec, contents = \
 
196
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
197
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
198
            plain=True)
 
199
        compression_parent = factory.parents[0][0]
 
200
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
201
        # Manually apply the delta because we have one annotated content and
 
202
        # one plain.
 
203
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
204
        basis_content.apply_delta(delta, rec[1])
 
205
        basis_content._should_strip_eol = factory._build_details[1]
 
206
        return ''.join(basis_content.text())
 
207
 
 
208
 
 
209
class FTPlainToFullText(KnitAdapter):
 
210
    """An adapter from FT plain knits to unannotated ones."""
 
211
 
 
212
    def get_bytes(self, factory, compressed_bytes):
 
213
        rec, contents = \
 
214
            self._data._parse_record_unchecked(compressed_bytes)
 
215
        content, delta = self._plain_factory.parse_record(factory.key[0],
 
216
            contents, factory._build_details, None)
 
217
        return ''.join(content.text())
 
218
 
 
219
 
 
220
class DeltaPlainToFullText(KnitAdapter):
 
221
    """An adapter for deltas from annotated to unannotated."""
 
222
 
 
223
    def get_bytes(self, factory, compressed_bytes):
 
224
        rec, contents = \
 
225
            self._data._parse_record_unchecked(compressed_bytes)
 
226
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
227
        compression_parent = factory.parents[0][0]
 
228
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
229
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
230
        # Manually apply the delta because we have one annotated content and
 
231
        # one plain.
 
232
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
233
            factory._build_details, basis_content)
 
234
        return ''.join(content.text())
 
235
 
 
236
 
 
237
class KnitContentFactory(ContentFactory):
 
238
    """Content factory for streaming from knits.
 
239
    
 
240
    :seealso ContentFactory:
 
241
    """
 
242
 
 
243
    def __init__(self, version, parents, build_details, sha1, raw_record,
 
244
        annotated, knit=None):
 
245
        """Create a KnitContentFactory for version.
 
246
        
 
247
        :param version: The version.
 
248
        :param parents: The parents.
 
249
        :param build_details: The build details as returned from
 
250
            get_build_details.
 
251
        :param sha1: The sha1 expected from the full text of this object.
 
252
        :param raw_record: The bytes of the knit data from disk.
 
253
        :param annotated: True if the raw data is annotated.
 
254
        """
 
255
        ContentFactory.__init__(self)
 
256
        self.sha1 = sha1
 
257
        self.key = (version,)
 
258
        self.parents = tuple((parent,) for parent in parents)
 
259
        if build_details[0] == 'line-delta':
 
260
            kind = 'delta'
 
261
        else:
 
262
            kind = 'ft'
 
263
        if annotated:
 
264
            annotated_kind = 'annotated-'
 
265
        else:
 
266
            annotated_kind = ''
 
267
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
268
        self._raw_record = raw_record
 
269
        self._build_details = build_details
 
270
        self._knit = knit
 
271
 
 
272
    def get_bytes_as(self, storage_kind):
 
273
        if storage_kind == self.storage_kind:
 
274
            return self._raw_record
 
275
        if storage_kind == 'fulltext' and self._knit is not None:
 
276
            return self._knit.get_text(self.key[0])
 
277
        else:
 
278
            raise errors.UnavailableRepresentation(self.key, storage_kind,
 
279
                self.storage_kind)
 
280
 
 
281
 
134
282
class KnitContent(object):
135
 
    """Content of a knit version to which deltas can be applied."""
 
283
    """Content of a knit version to which deltas can be applied.
 
284
    
 
285
    This is always stored in memory as a list of lines with \n at the end,
 
286
    plus a flag saying if the final ending is really there or not, because that 
 
287
    corresponds to the on-disk knit representation.
 
288
    """
136
289
 
137
 
    def annotate(self):
138
 
        """Return a list of (origin, text) tuples."""
139
 
        return list(self.annotate_iter())
 
290
    def __init__(self):
 
291
        self._should_strip_eol = False
140
292
 
141
293
    def apply_delta(self, delta, new_version_id):
142
294
        """Apply delta to this object to become new_version_id."""
187
339
    """Annotated content."""
188
340
 
189
341
    def __init__(self, lines):
 
342
        KnitContent.__init__(self)
190
343
        self._lines = lines
191
344
 
192
 
    def annotate_iter(self):
193
 
        """Yield tuples of (origin, text) for each content line."""
194
 
        return iter(self._lines)
 
345
    def annotate(self):
 
346
        """Return a list of (origin, text) for each content line."""
 
347
        lines = self._lines[:]
 
348
        if self._should_strip_eol:
 
349
            origin, last_line = lines[-1]
 
350
            lines[-1] = (origin, last_line.rstrip('\n'))
 
351
        return lines
195
352
 
196
353
    def apply_delta(self, delta, new_version_id):
197
354
        """Apply delta to this object to become new_version_id."""
201
358
            lines[offset+start:offset+end] = delta_lines
202
359
            offset = offset + (start - end) + count
203
360
 
204
 
    def strip_last_line_newline(self):
205
 
        line = self._lines[-1][1].rstrip('\n')
206
 
        self._lines[-1] = (self._lines[-1][0], line)
207
 
 
208
361
    def text(self):
209
362
        try:
210
 
            return [text for origin, text in self._lines]
 
363
            lines = [text for origin, text in self._lines]
211
364
        except ValueError, e:
212
365
            # most commonly (only?) caused by the internal form of the knit
213
366
            # missing annotation information because of a bug - see thread
215
368
            raise KnitCorrupt(self,
216
369
                "line in annotated knit missing annotation information: %s"
217
370
                % (e,))
 
371
        if self._should_strip_eol:
 
372
            lines[-1] = lines[-1].rstrip('\n')
 
373
        return lines
218
374
 
219
375
    def copy(self):
220
376
        return AnnotatedKnitContent(self._lines[:])
229
385
    """
230
386
 
231
387
    def __init__(self, lines, version_id):
 
388
        KnitContent.__init__(self)
232
389
        self._lines = lines
233
390
        self._version_id = version_id
234
391
 
235
 
    def annotate_iter(self):
236
 
        """Yield tuples of (origin, text) for each content line."""
237
 
        for line in self._lines:
238
 
            yield self._version_id, line
 
392
    def annotate(self):
 
393
        """Return a list of (origin, text) for each content line."""
 
394
        return [(self._version_id, line) for line in self._lines]
239
395
 
240
396
    def apply_delta(self, delta, new_version_id):
241
397
        """Apply delta to this object to become new_version_id."""
249
405
    def copy(self):
250
406
        return PlainKnitContent(self._lines[:], self._version_id)
251
407
 
252
 
    def strip_last_line_newline(self):
253
 
        self._lines[-1] = self._lines[-1].rstrip('\n')
254
 
 
255
408
    def text(self):
256
 
        return self._lines
257
 
 
258
 
 
259
 
class KnitAnnotateFactory(object):
 
409
        lines = self._lines
 
410
        if self._should_strip_eol:
 
411
            lines = lines[:]
 
412
            lines[-1] = lines[-1].rstrip('\n')
 
413
        return lines
 
414
 
 
415
 
 
416
class _KnitFactory(object):
 
417
    """Base class for common Factory functions."""
 
418
 
 
419
    def parse_record(self, version_id, record, record_details,
 
420
                     base_content, copy_base_content=True):
 
421
        """Parse a record into a full content object.
 
422
 
 
423
        :param version_id: The official version id for this content
 
424
        :param record: The data returned by read_records_iter()
 
425
        :param record_details: Details about the record returned by
 
426
            get_build_details
 
427
        :param base_content: If get_build_details returns a compression_parent,
 
428
            you must return a base_content here, else use None
 
429
        :param copy_base_content: When building from the base_content, decide
 
430
            you can either copy it and return a new object, or modify it in
 
431
            place.
 
432
        :return: (content, delta) A Content object and possibly a line-delta,
 
433
            delta may be None
 
434
        """
 
435
        method, noeol = record_details
 
436
        if method == 'line-delta':
 
437
            if copy_base_content:
 
438
                content = base_content.copy()
 
439
            else:
 
440
                content = base_content
 
441
            delta = self.parse_line_delta(record, version_id)
 
442
            content.apply_delta(delta, version_id)
 
443
        else:
 
444
            content = self.parse_fulltext(record, version_id)
 
445
            delta = None
 
446
        content._should_strip_eol = noeol
 
447
        return (content, delta)
 
448
 
 
449
 
 
450
class KnitAnnotateFactory(_KnitFactory):
260
451
    """Factory for creating annotated Content objects."""
261
452
 
262
453
    annotated = True
363
554
                       for origin, text in lines)
364
555
        return out
365
556
 
366
 
    def annotate_iter(self, knit, version_id):
 
557
    def annotate(self, knit, version_id):
367
558
        content = knit._get_content(version_id)
368
 
        return content.annotate_iter()
369
 
 
370
 
 
371
 
class KnitPlainFactory(object):
 
559
        return content.annotate()
 
560
 
 
561
 
 
562
class KnitPlainFactory(_KnitFactory):
372
563
    """Factory for creating plain Content objects."""
373
564
 
374
565
    annotated = False
425
616
            out.extend(lines)
426
617
        return out
427
618
 
428
 
    def annotate_iter(self, knit, version_id):
429
 
        return annotate_knit(knit, version_id)
 
619
    def annotate(self, knit, version_id):
 
620
        annotator = _KnitAnnotator(knit)
 
621
        return annotator.annotate(version_id)
430
622
 
431
623
 
432
624
def make_empty_knit(transport, relpath):
433
625
    """Construct a empty knit at the specified location."""
434
 
    k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
 
626
    k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
 
627
 
 
628
 
 
629
def make_file_knit(name, transport, file_mode=None, access_mode='w',
 
630
    factory=None, delta=True, create=False, create_parent_dir=False,
 
631
    delay_create=False, dir_mode=None, get_scope=None):
 
632
    """Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
 
633
    if factory is None:
 
634
        factory = KnitAnnotateFactory()
 
635
    if get_scope is None:
 
636
        get_scope = lambda:None
 
637
    index = _KnitIndex(transport, name + INDEX_SUFFIX,
 
638
        access_mode, create=create, file_mode=file_mode,
 
639
        create_parent_dir=create_parent_dir, delay_create=delay_create,
 
640
        dir_mode=dir_mode, get_scope=get_scope)
 
641
    access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
 
642
        dir_mode, ((create and not len(index)) and delay_create),
 
643
        create_parent_dir)
 
644
    return KnitVersionedFile(name, transport, factory=factory,
 
645
        create=create, delay_create=delay_create, index=index,
 
646
        access_method=access)
 
647
 
 
648
 
 
649
def get_suffixes():
 
650
    """Return the suffixes used by file based knits."""
 
651
    return [DATA_SUFFIX, INDEX_SUFFIX]
 
652
make_file_knit.get_suffixes = get_suffixes
435
653
 
436
654
 
437
655
class KnitVersionedFile(VersionedFile):
449
667
    stored and retrieved.
450
668
    """
451
669
 
452
 
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
 
670
    def __init__(self, relpath, transport, file_mode=None,
453
671
        factory=None, delta=True, create=False, create_parent_dir=False,
454
672
        delay_create=False, dir_mode=None, index=None, access_method=None):
455
673
        """Construct a knit at location specified by relpath.
462
680
            actually be created until the first data is stored.
463
681
        :param index: An index to use for the knit.
464
682
        """
465
 
        if access_mode is None:
466
 
            access_mode = 'w'
467
 
        super(KnitVersionedFile, self).__init__(access_mode)
468
 
        assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
 
683
        super(KnitVersionedFile, self).__init__()
469
684
        self.transport = transport
470
685
        self.filename = relpath
471
686
        self.factory = factory or KnitAnnotateFactory()
472
 
        self.writable = (access_mode == 'w')
473
687
        self.delta = delta
474
688
 
475
689
        self._max_delta_chain = 200
476
690
 
477
 
        if index is None:
478
 
            self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
479
 
                access_mode, create=create, file_mode=file_mode,
480
 
                create_parent_dir=create_parent_dir, delay_create=delay_create,
481
 
                dir_mode=dir_mode)
482
 
        else:
483
 
            self._index = index
484
 
        if access_method is None:
485
 
            _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
486
 
                ((create and not len(self)) and delay_create), create_parent_dir)
487
 
        else:
488
 
            _access = access_method
 
691
        if None in (access_method, index):
 
692
            raise ValueError("No default access_method or index any more")
 
693
        self._index = index
 
694
        _access = access_method
489
695
        if create and not len(self) and not delay_create:
490
696
            _access.create()
491
697
        self._data = _KnitData(_access)
516
722
                fulltext_size = size
517
723
                break
518
724
            delta_size += size
519
 
            delta_parents = self._index.get_parents(parent)
 
725
            delta_parents = self._index.get_parent_map([parent])[parent]
520
726
        else:
521
727
            # We couldn't find a fulltext, so we must create a new one
522
728
            return False
523
729
 
524
730
        return fulltext_size > delta_size
525
731
 
 
732
    def _check_write_ok(self):
 
733
        return self._index._check_write_ok()
 
734
 
526
735
    def _add_raw_records(self, records, data):
527
736
        """Add all the records 'records' with data pre-joined in 'data'.
528
737
 
534
743
        # write all the data
535
744
        raw_record_sizes = [record[3] for record in records]
536
745
        positions = self._data.add_raw_records(raw_record_sizes, data)
537
 
        offset = 0
538
746
        index_entries = []
539
 
        for (version_id, options, parents, size), access_memo in zip(
 
747
        for (version_id, options, parents, _), access_memo in zip(
540
748
            records, positions):
541
749
            index_entries.append((version_id, options, access_memo, parents))
542
 
            if self._data._do_cache:
543
 
                self._data._cache[version_id] = data[offset:offset+size]
544
 
            offset += size
545
750
        self._index.add_versions(index_entries)
546
751
 
547
 
    def enable_cache(self):
548
 
        """Start caching data for this knit"""
549
 
        self._data.enable_cache()
550
 
 
551
 
    def clear_cache(self):
552
 
        """Clear the data cache only."""
553
 
        self._data.clear_cache()
554
 
 
555
752
    def copy_to(self, name, transport):
556
753
        """See VersionedFile.copy_to()."""
557
754
        # copy the current index to a temp index to avoid racing with local
567
764
        # move the copied index into place
568
765
        transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
569
766
 
570
 
    def create_empty(self, name, transport, mode=None):
571
 
        return KnitVersionedFile(name, transport, factory=self.factory,
572
 
                                 delta=self.delta, create=True)
573
 
    
574
767
    def get_data_stream(self, required_versions):
575
768
        """Get a data stream for the specified versions.
576
769
 
628
821
                # put them in anywhere, but we hope that sending them soon
629
822
                # after the fulltext will give good locality in the receiver
630
823
                ready_to_send[:0] = deferred.pop(version_id)
631
 
        assert len(deferred) == 0, \
632
 
            "Still have compressed child versions waiting to be sent"
 
824
        if not (len(deferred) == 0):
 
825
            raise AssertionError("Still have compressed child versions waiting to be sent")
633
826
        # XXX: The stream format is such that we cannot stream it - we have to
634
827
        # know the length of all the data a-priori.
635
828
        raw_datum = []
636
829
        result_version_list = []
637
 
        for (version_id, raw_data), \
 
830
        for (version_id, raw_data, _), \
638
831
            (version_id2, options, _, parents) in \
639
832
            izip(self._data.read_records_iter_raw(copy_queue_records),
640
833
                 temp_version_list):
641
 
            assert version_id == version_id2, \
642
 
                'logic error, inconsistent results'
 
834
            if not (version_id == version_id2):
 
835
                raise AssertionError('logic error, inconsistent results')
643
836
            raw_datum.append(raw_data)
644
837
            result_version_list.append(
645
838
                (version_id, options, len(raw_data), parents))
652
845
                return pseudo_file.read(length)
653
846
        return (self.get_format_signature(), result_version_list, read)
654
847
 
 
848
    def get_record_stream(self, versions, ordering, include_delta_closure):
 
849
        """Get a stream of records for versions.
 
850
 
 
851
        :param versions: The versions to include. Each version is a tuple
 
852
            (version,).
 
853
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
854
            sorted stream has compression parents strictly before their
 
855
            children.
 
856
        :param include_delta_closure: If True then the closure across any
 
857
            compression parents will be included (in the opaque data).
 
858
        :return: An iterator of ContentFactory objects, each of which is only
 
859
            valid until the iterator is advanced.
 
860
        """
 
861
        if include_delta_closure:
 
862
            # Nb: what we should do is plan the data to stream to allow
 
863
            # reconstruction of all the texts without excessive buffering,
 
864
            # including re-sending common bases as needed. This makes the most
 
865
            # sense when we start serialising these streams though, so for now
 
866
            # we just fallback to individual text construction behind the
 
867
            # abstraction barrier.
 
868
            knit = self
 
869
        else:
 
870
            knit = None
 
871
        # We end up doing multiple index lookups here for parents details and
 
872
        # disk layout details - we need a unified api ?
 
873
        parent_map = self.get_parent_map(versions)
 
874
        absent_versions = set(versions) - set(parent_map)
 
875
        if ordering == 'topological':
 
876
            present_versions = topo_sort(parent_map)
 
877
        else:
 
878
            # List comprehension to keep the requested order (as that seems
 
879
            # marginally useful, at least until we start doing IO optimising
 
880
            # here.
 
881
            present_versions = [version for version in versions if version in
 
882
                parent_map]
 
883
        position_map = self._get_components_positions(present_versions)
 
884
        records = [(version, position_map[version][1]) for version in
 
885
            present_versions]
 
886
        record_map = {}
 
887
        for version in absent_versions:
 
888
            yield AbsentContentFactory((version,))
 
889
        for version, raw_data, sha1 in \
 
890
                self._data.read_records_iter_raw(records):
 
891
            (record_details, index_memo, _) = position_map[version]
 
892
            yield KnitContentFactory(version, parent_map[version],
 
893
                record_details, sha1, raw_data, self.factory.annotated, knit)
 
894
 
655
895
    def _extract_blocks(self, version_id, source, target):
656
896
        if self._index.get_method(version_id) != 'line-delta':
657
897
            return None
661
901
    def get_delta(self, version_id):
662
902
        """Get a delta for constructing version from some other version."""
663
903
        self.check_not_reserved_id(version_id)
664
 
        parents = self.get_parents(version_id)
 
904
        parents = self.get_parent_map([version_id])[version_id]
665
905
        if len(parents):
666
906
            parent = parents[0]
667
907
        else:
692
932
            annotated_part = "plain"
693
933
        return "knit-%s" % (annotated_part,)
694
934
        
695
 
    def get_graph_with_ghosts(self):
696
 
        """See VersionedFile.get_graph_with_ghosts()."""
697
 
        graph_items = self._index.get_graph()
698
 
        return dict(graph_items)
699
 
 
700
 
    def get_sha1(self, version_id):
701
 
        return self.get_sha1s([version_id])[0]
702
 
 
703
935
    def get_sha1s(self, version_ids):
704
 
        """See VersionedFile.get_sha1()."""
 
936
        """See VersionedFile.get_sha1s()."""
705
937
        record_map = self._get_record_map(version_ids)
706
938
        # record entry 2 is the 'digest'.
707
939
        return [record_map[v][2] for v in version_ids]
708
940
 
709
 
    @staticmethod
710
 
    def get_suffixes():
711
 
        """See VersionedFile.get_suffixes()."""
712
 
        return [DATA_SUFFIX, INDEX_SUFFIX]
713
 
 
714
 
    def has_ghost(self, version_id):
715
 
        """True if there is a ghost reference in the file to version_id."""
716
 
        # maybe we have it
717
 
        if self.has_version(version_id):
718
 
            return False
719
 
        # optimisable if needed by memoising the _ghosts set.
720
 
        items = self._index.get_graph()
721
 
        for node, parents in items:
722
 
            for parent in parents:
723
 
                if parent not in self._index._cache:
724
 
                    if parent == version_id:
725
 
                        return True
726
 
        return False
727
 
 
728
941
    def insert_data_stream(self, (format, data_list, reader_callable)):
729
942
        """Insert knit records from a data stream into this knit.
730
943
 
741
954
                    'incompatible format signature inserting to %r', self)
742
955
            source = self._knit_from_datastream(
743
956
                (format, data_list, reader_callable))
744
 
            self.join(source)
 
957
            stream = source.get_record_stream(source.versions(), 'unordered', False)
 
958
            self.insert_record_stream(stream)
745
959
            return
746
960
 
747
961
        for version_id, options, length, parents in data_list:
759
973
                # Also check the SHA-1 of the fulltext this content will
760
974
                # produce.
761
975
                raw_data = reader_callable(length)
762
 
                my_fulltext_sha1 = self.get_sha1(version_id)
 
976
                my_fulltext_sha1 = self.get_sha1s([version_id])[0]
763
977
                df, rec = self._data._parse_record_header(version_id, raw_data)
764
978
                stream_fulltext_sha1 = rec[3]
765
979
                if my_fulltext_sha1 != stream_fulltext_sha1:
786
1000
                            'on the source repository, and "bzr reconcile" '
787
1001
                            'if necessary.' %
788
1002
                            (version_id, parents[0]))
 
1003
                    if not self.delta:
 
1004
                        # We received a line-delta record for a non-delta knit.
 
1005
                        # Convert it to a fulltext.
 
1006
                        gzip_bytes = reader_callable(length)
 
1007
                        self._convert_line_delta_to_fulltext(
 
1008
                            gzip_bytes, version_id, parents)
 
1009
                        continue
 
1010
 
789
1011
                self._add_raw_records(
790
1012
                    [(version_id, options, parents, length)],
791
1013
                    reader_callable(length))
792
1014
 
 
1015
    def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
 
1016
        lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
 
1017
        delta = self.factory.parse_line_delta(lines, version_id)
 
1018
        content = self.factory.make(self.get_lines(parents[0]), parents[0])
 
1019
        content.apply_delta(delta, version_id)
 
1020
        digest, len, content = self.add_lines(
 
1021
            version_id, parents, content.text())
 
1022
        if digest != sha1:
 
1023
            raise errors.VersionedFileInvalidChecksum(version_id)
 
1024
 
793
1025
    def _knit_from_datastream(self, (format, data_list, reader_callable)):
794
1026
        """Create a knit object from a data stream.
795
1027
 
807
1039
            factory = KnitAnnotateFactory()
808
1040
        else:
809
1041
            raise errors.KnitDataStreamUnknown(format)
810
 
        index = _StreamIndex(data_list)
 
1042
        index = _StreamIndex(data_list, self._index)
811
1043
        access = _StreamAccess(reader_callable, index, self, factory)
812
1044
        return KnitVersionedFile(self.filename, self.transport,
813
1045
            factory=factory, index=index, access_method=access)
814
1046
 
 
1047
    def insert_record_stream(self, stream):
 
1048
        """Insert a record stream into this versioned file.
 
1049
 
 
1050
        :param stream: A stream of records to insert. 
 
1051
        :return: None
 
1052
        :seealso VersionedFile.get_record_stream:
 
1053
        """
 
1054
        def get_adapter(adapter_key):
 
1055
            try:
 
1056
                return adapters[adapter_key]
 
1057
            except KeyError:
 
1058
                adapter_factory = adapter_registry.get(adapter_key)
 
1059
                adapter = adapter_factory(self)
 
1060
                adapters[adapter_key] = adapter
 
1061
                return adapter
 
1062
        if self.factory.annotated:
 
1063
            # self is annotated, we need annotated knits to use directly.
 
1064
            annotated = "annotated-"
 
1065
            convertibles = []
 
1066
        else:
 
1067
            # self is not annotated, but we can strip annotations cheaply.
 
1068
            annotated = ""
 
1069
            convertibles = set(["knit-annotated-delta-gz",
 
1070
                "knit-annotated-ft-gz"])
 
1071
        # The set of types we can cheaply adapt without needing basis texts.
 
1072
        native_types = set()
 
1073
        native_types.add("knit-%sdelta-gz" % annotated)
 
1074
        native_types.add("knit-%sft-gz" % annotated)
 
1075
        knit_types = native_types.union(convertibles)
 
1076
        adapters = {}
 
1077
        # Buffer all index entries that we can't add immediately because their
 
1078
        # basis parent is missing. We don't buffer all because generating
 
1079
        # annotations may require access to some of the new records. However we
 
1080
        # can't generate annotations from new deltas until their basis parent
 
1081
        # is present anyway, so we get away with not needing an index that
 
1082
        # includes the new keys.
 
1083
        # key = basis_parent, value = index entry to add
 
1084
        buffered_index_entries = {}
 
1085
        for record in stream:
 
1086
            # Raise an error when a record is missing.
 
1087
            if record.storage_kind == 'absent':
 
1088
                raise RevisionNotPresent([record.key[0]], self)
 
1089
            # adapt to non-tuple interface
 
1090
            parents = [parent[0] for parent in record.parents]
 
1091
            if record.storage_kind in knit_types:
 
1092
                if record.storage_kind not in native_types:
 
1093
                    try:
 
1094
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1095
                        adapter = get_adapter(adapter_key)
 
1096
                    except KeyError:
 
1097
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1098
                        adapter = get_adapter(adapter_key)
 
1099
                    bytes = adapter.get_bytes(
 
1100
                        record, record.get_bytes_as(record.storage_kind))
 
1101
                else:
 
1102
                    bytes = record.get_bytes_as(record.storage_kind)
 
1103
                options = [record._build_details[0]]
 
1104
                if record._build_details[1]:
 
1105
                    options.append('no-eol')
 
1106
                # Just blat it across.
 
1107
                # Note: This does end up adding data on duplicate keys. As
 
1108
                # modern repositories use atomic insertions this should not
 
1109
                # lead to excessive growth in the event of interrupted fetches.
 
1110
                # 'knit' repositories may suffer excessive growth, but as a
 
1111
                # deprecated format this is tolerable. It can be fixed if
 
1112
                # needed by in the kndx index support raising on a duplicate
 
1113
                # add with identical parents and options.
 
1114
                access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
 
1115
                index_entry = (record.key[0], options, access_memo, parents)
 
1116
                buffered = False
 
1117
                if 'fulltext' not in options:
 
1118
                    basis_parent = parents[0]
 
1119
                    if not self.has_version(basis_parent):
 
1120
                        pending = buffered_index_entries.setdefault(
 
1121
                            basis_parent, [])
 
1122
                        pending.append(index_entry)
 
1123
                        buffered = True
 
1124
                if not buffered:
 
1125
                    self._index.add_versions([index_entry])
 
1126
            elif record.storage_kind == 'fulltext':
 
1127
                self.add_lines(record.key[0], parents,
 
1128
                    split_lines(record.get_bytes_as('fulltext')))
 
1129
            else:
 
1130
                adapter_key = record.storage_kind, 'fulltext'
 
1131
                adapter = get_adapter(adapter_key)
 
1132
                lines = split_lines(adapter.get_bytes(
 
1133
                    record, record.get_bytes_as(record.storage_kind)))
 
1134
                try:
 
1135
                    self.add_lines(record.key[0], parents, lines)
 
1136
                except errors.RevisionAlreadyPresent:
 
1137
                    pass
 
1138
            # Add any records whose basis parent is now available.
 
1139
            added_keys = [record.key[0]]
 
1140
            while added_keys:
 
1141
                key = added_keys.pop(0)
 
1142
                if key in buffered_index_entries:
 
1143
                    index_entries = buffered_index_entries[key]
 
1144
                    self._index.add_versions(index_entries)
 
1145
                    added_keys.extend(
 
1146
                        [index_entry[0] for index_entry in index_entries])
 
1147
                    del buffered_index_entries[key]
 
1148
        # If there were any deltas which had a missing basis parent, error.
 
1149
        if buffered_index_entries:
 
1150
            raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
 
1151
                self)
 
1152
 
815
1153
    def versions(self):
816
1154
        """See VersionedFile.versions."""
817
1155
        if 'evil' in debug.debug_flags:
847
1185
                for i, j, n in seq.get_matching_blocks():
848
1186
                    if n == 0:
849
1187
                        continue
850
 
                    # this appears to copy (origin, text) pairs across to the
851
 
                    # new content for any line that matches the last-checked
 
1188
                    # this copies (origin, text) pairs across to the new
 
1189
                    # content for any line that matches the last-checked
852
1190
                    # parent.
853
1191
                    content._lines[j:j+n] = merge_content._lines[i:i+n]
854
1192
        if delta:
874
1212
 
875
1213
        This data is intended to be used for retrieving the knit records.
876
1214
 
877
 
        A dict of version_id to (method, index_memo, next) is
 
1215
        A dict of version_id to (record_details, index_memo, next, parents) is
878
1216
        returned.
879
1217
        method is the way referenced data should be applied.
880
 
        data_pos is the position of the data in the knit.
881
 
        data_size is the size of the data in the knit.
 
1218
        index_memo is the handle to pass to the data access to actually get the
 
1219
            data
882
1220
        next is the build-parent of the version, or None for fulltexts.
 
1221
        parents is the version_ids of the parents of this version
883
1222
        """
884
1223
        component_data = {}
885
1224
        pending_components = version_ids
886
1225
        while pending_components:
887
1226
            build_details = self._index.get_build_details(pending_components)
 
1227
            current_components = set(pending_components)
888
1228
            pending_components = set()
889
 
            for version_id, details in build_details.items():
890
 
                method, index_memo, compression_parent = details
 
1229
            for version_id, details in build_details.iteritems():
 
1230
                (index_memo, compression_parent, parents,
 
1231
                 record_details) = details
 
1232
                method = record_details[0]
891
1233
                if compression_parent is not None:
892
1234
                    pending_components.add(compression_parent)
893
 
                component_data[version_id] = details
 
1235
                component_data[version_id] = (record_details, index_memo,
 
1236
                                              compression_parent)
 
1237
            missing = current_components.difference(build_details)
 
1238
            if missing:
 
1239
                raise errors.RevisionNotPresent(missing.pop(), self.filename)
894
1240
        return component_data
895
1241
       
896
1242
    def _get_content(self, version_id, parent_texts={}):
910
1256
        self._index.check_versions_present(version_ids)
911
1257
 
912
1258
    def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts,
913
 
        nostore_sha, random_id, check_content):
 
1259
        nostore_sha, random_id, check_content, left_matching_blocks):
914
1260
        """See VersionedFile.add_lines_with_ghosts()."""
915
1261
        self._check_add(version_id, lines, random_id, check_content)
916
1262
        return self._add(version_id, lines, parents, self.delta,
917
 
            parent_texts, None, nostore_sha, random_id)
 
1263
            parent_texts, left_matching_blocks, nostore_sha, random_id)
918
1264
 
919
1265
    def _add_lines(self, version_id, parents, lines, parent_texts,
920
1266
        left_matching_blocks, nostore_sha, random_id, check_content):
984
1330
            # I/O and the time spend applying deltas.
985
1331
            delta = self._check_should_delta(present_parents)
986
1332
 
987
 
        assert isinstance(version_id, str)
988
1333
        content = self.factory.make(lines, version_id)
 
1334
        if 'no-eol' in options:
 
1335
            # Hint to the content object that its text() call should strip the
 
1336
            # EOL.
 
1337
            content._should_strip_eol = True
989
1338
        if delta or (self.factory.annotated and len(present_parents) > 0):
990
1339
            # Merge annotations from parent texts if needed.
991
1340
            delta_hunks = self._merge_annotations(content, present_parents,
1020
1369
 
1021
1370
    def check(self, progress_bar=None):
1022
1371
        """See VersionedFile.check()."""
1023
 
 
1024
 
    def _clone_text(self, new_version_id, old_version_id, parents):
1025
 
        """See VersionedFile.clone_text()."""
1026
 
        # FIXME RBC 20060228 make fast by only inserting an index with null 
1027
 
        # delta.
1028
 
        self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
 
1372
        # This doesn't actually test extraction of everything, but that will
 
1373
        # impact 'bzr check' substantially, and needs to be integrated with
 
1374
        # care. However, it does check for the obvious problem of a delta with
 
1375
        # no basis.
 
1376
        versions = self.versions()
 
1377
        parent_map = self.get_parent_map(versions)
 
1378
        for version in versions:
 
1379
            if self._index.get_method(version) != 'fulltext':
 
1380
                compression_parent = parent_map[version][0]
 
1381
                if compression_parent not in parent_map:
 
1382
                    raise errors.KnitCorrupt(self,
 
1383
                        "Missing basis parent %s for %s" % (
 
1384
                        compression_parent, version))
1029
1385
 
1030
1386
    def get_lines(self, version_id):
1031
1387
        """See VersionedFile.get_lines()."""
1034
1390
    def _get_record_map(self, version_ids):
1035
1391
        """Produce a dictionary of knit records.
1036
1392
        
1037
 
        The keys are version_ids, the values are tuples of (method, content,
1038
 
        digest, next).
1039
 
        method is the way the content should be applied.  
1040
 
        content is a KnitContent object.
1041
 
        digest is the SHA1 digest of this version id after all steps are done
1042
 
        next is the build-parent of the version, i.e. the leftmost ancestor.
1043
 
        If the method is fulltext, next will be None.
 
1393
        :return: {version_id:(record, record_details, digest, next)}
 
1394
            record
 
1395
                data returned from read_records
 
1396
            record_details
 
1397
                opaque information to pass to parse_record
 
1398
            digest
 
1399
                SHA1 digest of the full text after all steps are done
 
1400
            next
 
1401
                build-parent of the version, i.e. the leftmost ancestor.
 
1402
                Will be None if the record is not a delta.
1044
1403
        """
1045
1404
        position_map = self._get_components_positions(version_ids)
1046
 
        # c = component_id, m = method, i_m = index_memo, n = next
1047
 
        records = [(c, i_m) for c, (m, i_m, n) in position_map.iteritems()]
 
1405
        # c = component_id, r = record_details, i_m = index_memo, n = next
 
1406
        records = [(c, i_m) for c, (r, i_m, n)
 
1407
                             in position_map.iteritems()]
1048
1408
        record_map = {}
1049
 
        for component_id, content, digest in \
 
1409
        for component_id, record, digest in \
1050
1410
                self._data.read_records_iter(records):
1051
 
            method, index_memo, next = position_map[component_id]
1052
 
            record_map[component_id] = method, content, digest, next
1053
 
                          
 
1411
            (record_details, index_memo, next) = position_map[component_id]
 
1412
            record_map[component_id] = record, record_details, digest, next
 
1413
 
1054
1414
        return record_map
1055
1415
 
1056
1416
    def get_text(self, version_id):
1091
1451
            components = []
1092
1452
            cursor = version_id
1093
1453
            while cursor is not None:
1094
 
                method, data, digest, next = record_map[cursor]
1095
 
                components.append((cursor, method, data, digest))
 
1454
                record, record_details, digest, next = record_map[cursor]
 
1455
                components.append((cursor, record, record_details, digest))
1096
1456
                if cursor in content_map:
1097
1457
                    break
1098
1458
                cursor = next
1099
1459
 
1100
1460
            content = None
1101
 
            for component_id, method, data, digest in reversed(components):
 
1461
            for (component_id, record, record_details,
 
1462
                 digest) in reversed(components):
1102
1463
                if component_id in content_map:
1103
1464
                    content = content_map[component_id]
1104
1465
                else:
1105
 
                    if method == 'fulltext':
1106
 
                        assert content is None
1107
 
                        content = self.factory.parse_fulltext(data, version_id)
1108
 
                    elif method == 'line-delta':
1109
 
                        delta = self.factory.parse_line_delta(data, version_id)
1110
 
                        if multiple_versions:
1111
 
                            # only doing this when we want multiple versions
1112
 
                            # output avoids list copies - which reference and
1113
 
                            # dereference many strings.
1114
 
                            content = content.copy()
1115
 
                        content.apply_delta(delta, version_id)
 
1466
                    content, delta = self.factory.parse_record(version_id,
 
1467
                        record, record_details, content,
 
1468
                        copy_base_content=multiple_versions)
1116
1469
                    if multiple_versions:
1117
1470
                        content_map[component_id] = content
1118
1471
 
1119
 
            if 'no-eol' in self._index.get_options(version_id):
1120
 
                if multiple_versions:
1121
 
                    content = content.copy()
1122
 
                content.strip_last_line_newline()
1123
1472
            final_content[version_id] = content
1124
1473
 
1125
1474
            # digest here is the digest from the last applied component.
1162
1511
            enumerate(self._data.read_records_iter(version_id_records)):
1163
1512
            pb.update('Walking content.', version_idx, total)
1164
1513
            method = self._index.get_method(version_id)
1165
 
 
1166
 
            assert method in ('fulltext', 'line-delta')
1167
1514
            if method == 'fulltext':
1168
1515
                line_iterator = self.factory.get_fulltext_content(data)
1169
 
            else:
 
1516
            elif method == 'line-delta':
1170
1517
                line_iterator = self.factory.get_linedelta_content(data)
 
1518
            else:
 
1519
                raise ValueError('invalid method %r' % (method,))
1171
1520
            # XXX: It might be more efficient to yield (version_id,
1172
1521
            # line_iterator) in the future. However for now, this is a simpler
1173
1522
            # change to integrate into the rest of the codebase. RBC 20071110
1176
1525
 
1177
1526
        pb.update('Walking content.', total, total)
1178
1527
        
1179
 
    def iter_parents(self, version_ids):
1180
 
        """Iterate through the parents for many version ids.
1181
 
 
1182
 
        :param version_ids: An iterable yielding version_ids.
1183
 
        :return: An iterator that yields (version_id, parents). Requested 
1184
 
            version_ids not present in the versioned file are simply skipped.
1185
 
            The order is undefined, allowing for different optimisations in
1186
 
            the underlying implementation.
1187
 
        """
1188
 
        return self._index.iter_parents(version_ids)
1189
 
 
1190
1528
    def num_versions(self):
1191
1529
        """See VersionedFile.num_versions()."""
1192
1530
        return self._index.num_versions()
1193
1531
 
1194
1532
    __len__ = num_versions
1195
1533
 
1196
 
    def annotate_iter(self, version_id):
1197
 
        """See VersionedFile.annotate_iter."""
1198
 
        return self.factory.annotate_iter(self, version_id)
1199
 
 
1200
 
    def get_parents(self, version_id):
1201
 
        """See VersionedFile.get_parents."""
1202
 
        # perf notes:
1203
 
        # optimism counts!
1204
 
        # 52554 calls in 1264 872 internal down from 3674
1205
 
        try:
1206
 
            return self._index.get_parents(version_id)
1207
 
        except KeyError:
1208
 
            raise RevisionNotPresent(version_id, self.filename)
1209
 
 
1210
 
    def get_parents_with_ghosts(self, version_id):
1211
 
        """See VersionedFile.get_parents."""
1212
 
        try:
1213
 
            return self._index.get_parents_with_ghosts(version_id)
1214
 
        except KeyError:
1215
 
            raise RevisionNotPresent(version_id, self.filename)
 
1534
    def annotate(self, version_id):
 
1535
        """See VersionedFile.annotate."""
 
1536
        return self.factory.annotate(self, version_id)
 
1537
 
 
1538
    def get_parent_map(self, version_ids):
 
1539
        """See VersionedFile.get_parent_map."""
 
1540
        return self._index.get_parent_map(version_ids)
1216
1541
 
1217
1542
    def get_ancestry(self, versions, topo_sorted=True):
1218
1543
        """See VersionedFile.get_ancestry."""
1350
1675
                                   parents,
1351
1676
                                   index)
1352
1677
 
 
1678
    def _check_write_ok(self):
 
1679
        if self._get_scope() != self._scope:
 
1680
            raise errors.OutSideTransaction()
 
1681
        if self._mode != 'w':
 
1682
            raise errors.ReadOnlyObjectDirtiedError(self)
 
1683
 
1353
1684
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
1354
 
                 create_parent_dir=False, delay_create=False, dir_mode=None):
 
1685
        create_parent_dir=False, delay_create=False, dir_mode=None,
 
1686
        get_scope=None):
1355
1687
        _KnitComponentFile.__init__(self, transport, filename, mode,
1356
1688
                                    file_mode=file_mode,
1357
1689
                                    create_parent_dir=create_parent_dir,
1378
1710
            else:
1379
1711
                self._transport.put_bytes_non_atomic(
1380
1712
                    self._filename, self.HEADER, mode=self._file_mode)
1381
 
 
1382
 
    def get_graph(self):
1383
 
        """Return a list of the node:parents lists from this knit index."""
1384
 
        return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
 
1713
        self._scope = get_scope()
 
1714
        self._get_scope = get_scope
1385
1715
 
1386
1716
    def get_ancestry(self, versions, topo_sorted=True):
1387
1717
        """See VersionedFile.get_ancestry."""
1426
1756
    def get_build_details(self, version_ids):
1427
1757
        """Get the method, index_memo and compression parent for version_ids.
1428
1758
 
 
1759
        Ghosts are omitted from the result.
 
1760
 
1429
1761
        :param version_ids: An iterable of version_ids.
1430
 
        :return: A dict of version_id:(method, index_memo, compression_parent).
 
1762
        :return: A dict of version_id:(index_memo, compression_parent,
 
1763
                                       parents, record_details).
 
1764
            index_memo
 
1765
                opaque structure to pass to read_records to extract the raw
 
1766
                data
 
1767
            compression_parent
 
1768
                Content that this record is built upon, may be None
 
1769
            parents
 
1770
                Logical parents of this node
 
1771
            record_details
 
1772
                extra information about the content which needs to be passed to
 
1773
                Factory.parse_record
1431
1774
        """
1432
1775
        result = {}
1433
1776
        for version_id in version_ids:
 
1777
            if version_id not in self._cache:
 
1778
                # ghosts are omitted
 
1779
                continue
1434
1780
            method = self.get_method(version_id)
 
1781
            parents = self.get_parents_with_ghosts(version_id)
1435
1782
            if method == 'fulltext':
1436
1783
                compression_parent = None
1437
1784
            else:
1438
 
                compression_parent = self.get_parents_with_ghosts(version_id)[0]
 
1785
                compression_parent = parents[0]
 
1786
            noeol = 'no-eol' in self.get_options(version_id)
1439
1787
            index_memo = self.get_position(version_id)
1440
 
            result[version_id] = (method, index_memo, compression_parent)
 
1788
            result[version_id] = (index_memo, compression_parent,
 
1789
                                  parents, (method, noeol))
1441
1790
        return result
1442
1791
 
1443
 
    def iter_parents(self, version_ids):
1444
 
        """Iterate through the parents for many version ids.
1445
 
 
1446
 
        :param version_ids: An iterable yielding version_ids.
1447
 
        :return: An iterator that yields (version_id, parents). Requested 
1448
 
            version_ids not present in the versioned file are simply skipped.
1449
 
            The order is undefined, allowing for different optimisations in
1450
 
            the underlying implementation.
1451
 
        """
1452
 
        for version_id in version_ids:
1453
 
            try:
1454
 
                yield version_id, tuple(self.get_parents(version_id))
1455
 
            except KeyError:
1456
 
                pass
1457
 
 
1458
1792
    def num_versions(self):
1459
1793
        return len(self._history)
1460
1794
 
1499
1833
                                               pos,
1500
1834
                                               size,
1501
1835
                                               self._version_list_to_index(parents))
1502
 
                assert isinstance(line, str), \
1503
 
                    'content must be utf-8 encoded: %r' % (line,)
1504
1836
                lines.append(line)
1505
 
                self._cache_version(version_id, options, pos, size, parents)
 
1837
                self._cache_version(version_id, options, pos, size, tuple(parents))
1506
1838
            if not self._need_to_create:
1507
1839
                self._transport.append_bytes(self._filename, ''.join(lines))
1508
1840
            else:
1557
1889
        """
1558
1890
        return self._cache[version_id][1]
1559
1891
 
1560
 
    def get_parents(self, version_id):
1561
 
        """Return parents of specified version ignoring ghosts."""
1562
 
        return [parent for parent in self._cache[version_id][4] 
1563
 
                if parent in self._cache]
 
1892
    def get_parent_map(self, version_ids):
 
1893
        """Passed through to by KnitVersionedFile.get_parent_map."""
 
1894
        result = {}
 
1895
        for version_id in version_ids:
 
1896
            try:
 
1897
                result[version_id] = tuple(self._cache[version_id][4])
 
1898
            except KeyError:
 
1899
                pass
 
1900
        return result
1564
1901
 
1565
1902
    def get_parents_with_ghosts(self, version_id):
1566
1903
        """Return parents of specified version with ghosts."""
1567
 
        return self._cache[version_id][4] 
 
1904
        try:
 
1905
            return self.get_parent_map([version_id])[version_id]
 
1906
        except KeyError:
 
1907
            raise RevisionNotPresent(version_id, self)
1568
1908
 
1569
1909
    def check_versions_present(self, version_ids):
1570
1910
        """Check that all specified versions are present."""
1596
1936
            raise KnitCorrupt(self, "Cannot do delta compression without "
1597
1937
                "parent tracking.")
1598
1938
 
 
1939
    def _check_write_ok(self):
 
1940
        pass
 
1941
 
1599
1942
    def _get_entries(self, keys, check_present=False):
1600
1943
        """Get the entries for keys.
1601
1944
        
1706
2049
    def get_build_details(self, version_ids):
1707
2050
        """Get the method, index_memo and compression parent for version_ids.
1708
2051
 
 
2052
        Ghosts are omitted from the result.
 
2053
 
1709
2054
        :param version_ids: An iterable of version_ids.
1710
 
        :return: A dict of version_id:(method, index_memo, compression_parent).
 
2055
        :return: A dict of version_id:(index_memo, compression_parent,
 
2056
                                       parents, record_details).
 
2057
            index_memo
 
2058
                opaque structure to pass to read_records to extract the raw
 
2059
                data
 
2060
            compression_parent
 
2061
                Content that this record is built upon, may be None
 
2062
            parents
 
2063
                Logical parents of this node
 
2064
            record_details
 
2065
                extra information about the content which needs to be passed to
 
2066
                Factory.parse_record
1711
2067
        """
1712
2068
        result = {}
1713
2069
        entries = self._get_entries(self._version_ids_to_keys(version_ids), True)
1714
2070
        for entry in entries:
1715
2071
            version_id = self._keys_to_version_ids((entry[1],))[0]
 
2072
            if not self._parents:
 
2073
                parents = ()
 
2074
            else:
 
2075
                parents = self._keys_to_version_ids(entry[3][0])
1716
2076
            if not self._deltas:
1717
2077
                compression_parent = None
1718
2078
            else:
1722
2082
                    (compression_parent_key,))[0]
1723
2083
                else:
1724
2084
                    compression_parent = None
 
2085
            noeol = (entry[2][0] == 'N')
1725
2086
            if compression_parent:
1726
2087
                method = 'line-delta'
1727
2088
            else:
1728
2089
                method = 'fulltext'
1729
 
            result[version_id] = (method, self._node_to_position(entry),
1730
 
                compression_parent)
 
2090
            result[version_id] = (self._node_to_position(entry),
 
2091
                                  compression_parent, parents,
 
2092
                                  (method, noeol))
1731
2093
        return result
1732
2094
 
1733
2095
    def _compression_parent(self, an_entry):
1736
2098
        compression_parents = an_entry[3][1]
1737
2099
        if not compression_parents:
1738
2100
            return None
1739
 
        assert len(compression_parents) == 1
1740
2101
        return compression_parents[0]
1741
2102
 
1742
2103
    def _get_method(self, node):
1747
2108
        else:
1748
2109
            return 'fulltext'
1749
2110
 
1750
 
    def get_graph(self):
1751
 
        """Return a list of the node:parents lists from this knit index."""
1752
 
        if not self._parents:
1753
 
            return [(key, ()) for key in self.get_versions()]
1754
 
        result = []
1755
 
        for index, key, value, refs in self._graph_index.iter_all_entries():
1756
 
            result.append((key[0], tuple([ref[0] for ref in refs[0]])))
1757
 
        return result
1758
 
 
1759
 
    def iter_parents(self, version_ids):
1760
 
        """Iterate through the parents for many version ids.
1761
 
 
1762
 
        :param version_ids: An iterable yielding version_ids.
1763
 
        :return: An iterator that yields (version_id, parents). Requested 
1764
 
            version_ids not present in the versioned file are simply skipped.
1765
 
            The order is undefined, allowing for different optimisations in
1766
 
            the underlying implementation.
1767
 
        """
1768
 
        if self._parents:
1769
 
            all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1770
 
            all_parents = set()
1771
 
            present_parents = set()
1772
 
            for node in all_nodes:
1773
 
                all_parents.update(node[3][0])
1774
 
                # any node we are querying must be present
1775
 
                present_parents.add(node[1])
1776
 
            unknown_parents = all_parents.difference(present_parents)
1777
 
            present_parents.update(self._present_keys(unknown_parents))
1778
 
            for node in all_nodes:
1779
 
                parents = []
1780
 
                for parent in node[3][0]:
1781
 
                    if parent in present_parents:
1782
 
                        parents.append(parent[0])
1783
 
                yield node[1][0], tuple(parents)
1784
 
        else:
1785
 
            for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1786
 
                yield node[1][0], ()
1787
 
 
1788
2111
    def num_versions(self):
1789
2112
        return len(list(self._graph_index.iter_all_entries()))
1790
2113
 
1836
2159
            options.append('no-eol')
1837
2160
        return options
1838
2161
 
1839
 
    def get_parents(self, version_id):
1840
 
        """Return parents of specified version ignoring ghosts."""
1841
 
        parents = list(self.iter_parents([version_id]))
1842
 
        if not parents:
1843
 
            # missing key
1844
 
            raise errors.RevisionNotPresent(version_id, self)
1845
 
        return parents[0][1]
 
2162
    def get_parent_map(self, version_ids):
 
2163
        """Passed through to by KnitVersionedFile.get_parent_map."""
 
2164
        nodes = self._get_entries(self._version_ids_to_keys(version_ids))
 
2165
        result = {}
 
2166
        if self._parents:
 
2167
            for node in nodes:
 
2168
                result[node[1][0]] = self._keys_to_version_ids(node[3][0])
 
2169
        else:
 
2170
            for node in nodes:
 
2171
                result[node[1][0]] = ()
 
2172
        return result
1846
2173
 
1847
2174
    def get_parents_with_ghosts(self, version_id):
1848
2175
        """Return parents of specified version with ghosts."""
1849
 
        nodes = list(self._get_entries(self._version_ids_to_keys([version_id]),
1850
 
            check_present=True))
1851
 
        if not self._parents:
1852
 
            return ()
1853
 
        return self._keys_to_version_ids(nodes[0][3][0])
 
2176
        try:
 
2177
            return self.get_parent_map([version_id])[version_id]
 
2178
        except KeyError:
 
2179
            raise RevisionNotPresent(version_id, self)
1854
2180
 
1855
2181
    def check_versions_present(self, version_ids):
1856
2182
        """Check that all specified versions are present."""
1947
2273
        self._need_to_create = _need_to_create
1948
2274
        self._create_parent_dir = _create_parent_dir
1949
2275
 
 
2276
    def __repr__(self):
 
2277
        return "%s(%r)" % (self.__class__.__name__,
 
2278
            self._transport.abspath(self._filename))
 
2279
 
1950
2280
    def add_raw_records(self, sizes, raw_data):
1951
2281
        """Add raw knit bytes to a storage area.
1952
2282
 
1958
2288
            tuple - (index, pos, length), where the index field is always None
1959
2289
            for the .knit access method.
1960
2290
        """
1961
 
        assert type(raw_data) == str, \
1962
 
            'data must be plain bytes was %s' % type(raw_data)
1963
2291
        if not self._need_to_create:
1964
2292
            base = self._transport.append_bytes(self._filename, raw_data)
1965
2293
        else:
2042
2370
            tuple - (index, pos, length), where the index field is the 
2043
2371
            write_index object supplied to the PackAccess object.
2044
2372
        """
2045
 
        assert type(raw_data) == str, \
2046
 
            'data must be plain bytes was %s' % type(raw_data)
2047
2373
        result = []
2048
2374
        offset = 0
2049
2375
        for size in sizes:
2132
2458
    def get_raw_records(self, memos_for_retrieval):
2133
2459
        """Get the raw bytes for a records.
2134
2460
 
2135
 
        :param memos_for_retrieval: An iterable containing the (thunk_flag,
2136
 
            index, start, end) memo for retrieving the bytes.
2137
 
        :return: An iterator over the bytes of the records.
 
2461
        :param memos_for_retrieval: An iterable of memos from the
 
2462
            _StreamIndex object identifying bytes to read; for these classes
 
2463
            they are (from_backing_knit, index, start, end) and can point to
 
2464
            either the backing knit or streamed data.
 
2465
        :return: An iterator yielding a byte string for each record in 
 
2466
            memos_for_retrieval.
2138
2467
        """
2139
2468
        # use a generator for memory friendliness
2140
 
        for thunk_flag, version_id, start, end in memos_for_retrieval:
2141
 
            if version_id is self.stream_index:
 
2469
        for from_backing_knit, version_id, start, end in memos_for_retrieval:
 
2470
            if not from_backing_knit:
 
2471
                if version_id is not self.stream_index:
 
2472
                    raise AssertionError()
2142
2473
                yield self.data[start:end]
2143
2474
                continue
2144
2475
            # we have been asked to thunk. This thunking only occurs when
2149
2480
            # as desired. However, for now, this is sufficient.
2150
2481
            if self.orig_factory.__class__ != KnitPlainFactory:
2151
2482
                raise errors.KnitCorrupt(
2152
 
                    self, 'Bad thunk request %r' % version_id)
 
2483
                    self, 'Bad thunk request %r cannot be backed by %r' %
 
2484
                        (version_id, self.orig_factory))
2153
2485
            lines = self.backing_knit.get_lines(version_id)
2154
2486
            line_bytes = ''.join(lines)
2155
2487
            digest = sha_string(line_bytes)
 
2488
            # the packed form of the fulltext always has a trailing newline,
 
2489
            # even if the actual text does not, unless the file is empty.  the
 
2490
            # record options including the noeol flag are passed through by
 
2491
            # _StreamIndex, so this is safe.
2156
2492
            if lines:
2157
2493
                if lines[-1][-1] != '\n':
2158
2494
                    lines[-1] = lines[-1] + '\n'
2159
2495
                    line_bytes += '\n'
2160
 
            orig_options = list(self.backing_knit._index.get_options(version_id))
2161
 
            if 'fulltext' not in orig_options:
2162
 
                if 'line-delta' not in orig_options:
2163
 
                    raise errors.KnitCorrupt(self,
2164
 
                        'Unknown compression method %r' % orig_options)
2165
 
                orig_options.remove('line-delta')
2166
 
                orig_options.append('fulltext')
2167
2496
            # We want plain data, because we expect to thunk only to allow text
2168
2497
            # extraction.
2169
2498
            size, bytes = self.backing_knit._data._record_to_data(version_id,
2174
2503
class _StreamIndex(object):
2175
2504
    """A Knit Index object that uses the data map from a datastream."""
2176
2505
 
2177
 
    def __init__(self, data_list):
 
2506
    def __init__(self, data_list, backing_index):
2178
2507
        """Create a _StreamIndex object.
2179
2508
 
2180
2509
        :param data_list: The data_list from the datastream.
 
2510
        :param backing_index: The index which will supply values for nodes
 
2511
            referenced outside of this stream.
2181
2512
        """
2182
2513
        self.data_list = data_list
 
2514
        self.backing_index = backing_index
2183
2515
        self._by_version = {}
2184
2516
        pos = 0
2185
2517
        for key, options, length, parents in data_list:
2212
2544
    def get_build_details(self, version_ids):
2213
2545
        """Get the method, index_memo and compression parent for version_ids.
2214
2546
 
 
2547
        Ghosts are omitted from the result.
 
2548
 
2215
2549
        :param version_ids: An iterable of version_ids.
2216
 
        :return: A dict of version_id:(method, index_memo, compression_parent).
 
2550
        :return: A dict of version_id:(index_memo, compression_parent,
 
2551
                                       parents, record_details).
 
2552
            index_memo
 
2553
                opaque memo that can be passed to _StreamAccess.read_records
 
2554
                to extract the raw data; for these classes it is
 
2555
                (from_backing_knit, index, start, end) 
 
2556
            compression_parent
 
2557
                Content that this record is built upon, may be None
 
2558
            parents
 
2559
                Logical parents of this node
 
2560
            record_details
 
2561
                extra information about the content which needs to be passed to
 
2562
                Factory.parse_record
2217
2563
        """
2218
2564
        result = {}
2219
2565
        for version_id in version_ids:
2220
 
            method = self.get_method(version_id)
 
2566
            try:
 
2567
                method = self.get_method(version_id)
 
2568
            except errors.RevisionNotPresent:
 
2569
                # ghosts are omitted
 
2570
                continue
 
2571
            parent_ids = self.get_parents_with_ghosts(version_id)
 
2572
            noeol = ('no-eol' in self.get_options(version_id))
 
2573
            index_memo = self.get_position(version_id)
 
2574
            from_backing_knit = index_memo[0]
 
2575
            if from_backing_knit:
 
2576
                # texts retrieved from the backing knit are always full texts
 
2577
                method = 'fulltext'
2221
2578
            if method == 'fulltext':
2222
2579
                compression_parent = None
2223
2580
            else:
2224
 
                compression_parent = self.get_parents_with_ghosts(version_id)[0]
2225
 
            index_memo = self.get_position(version_id)
2226
 
            result[version_id] = (method, index_memo, compression_parent)
 
2581
                compression_parent = parent_ids[0]
 
2582
            result[version_id] = (index_memo, compression_parent,
 
2583
                                  parent_ids, (method, noeol))
2227
2584
        return result
2228
2585
 
2229
2586
    def get_method(self, version_id):
2230
2587
        """Return compression method of specified version."""
2231
 
        try:
2232
 
            options = self._by_version[version_id][0]
2233
 
        except KeyError:
2234
 
            # Strictly speaking this should check in the backing knit, but
2235
 
            # until we have a test to discriminate, this will do.
2236
 
            return 'fulltext'
 
2588
        options = self.get_options(version_id)
2237
2589
        if 'fulltext' in options:
2238
2590
            return 'fulltext'
2239
2591
        elif 'line-delta' in options:
2246
2598
 
2247
2599
        e.g. ['foo', 'bar']
2248
2600
        """
2249
 
        return self._by_version[version_id][0]
 
2601
        try:
 
2602
            return self._by_version[version_id][0]
 
2603
        except KeyError:
 
2604
            options = list(self.backing_index.get_options(version_id))
 
2605
            if 'fulltext' in options:
 
2606
                pass
 
2607
            elif 'line-delta' in options:
 
2608
                # Texts from the backing knit are always returned from the stream
 
2609
                # as full texts
 
2610
                options.remove('line-delta')
 
2611
                options.append('fulltext')
 
2612
            else:
 
2613
                raise errors.KnitIndexUnknownMethod(self, options)
 
2614
            return tuple(options)
 
2615
 
 
2616
    def get_parent_map(self, version_ids):
 
2617
        """Passed through to by KnitVersionedFile.get_parent_map."""
 
2618
        result = {}
 
2619
        pending_ids = set()
 
2620
        for version_id in version_ids:
 
2621
            try:
 
2622
                result[version_id] = self._by_version[version_id][2]
 
2623
            except KeyError:
 
2624
                pending_ids.add(version_id)
 
2625
        result.update(self.backing_index.get_parent_map(pending_ids))
 
2626
        return result
2250
2627
 
2251
2628
    def get_parents_with_ghosts(self, version_id):
2252
2629
        """Return parents of specified version with ghosts."""
2253
 
        return self._by_version[version_id][2]
 
2630
        try:
 
2631
            return self.get_parent_map([version_id])[version_id]
 
2632
        except KeyError:
 
2633
            raise RevisionNotPresent(version_id, self)
2254
2634
 
2255
2635
    def get_position(self, version_id):
2256
2636
        """Return details needed to access the version.
2259
2639
        coordinates into that (as index_memo's are opaque outside the
2260
2640
        index and matching access class).
2261
2641
 
2262
 
        :return: a tuple (thunk_flag, index, start, end).  If thunk_flag is
2263
 
            False, index will be self, otherwise it will be a version id.
 
2642
        :return: a tuple (from_backing_knit, index, start, end) that can 
 
2643
            be passed e.g. to get_raw_records.  
 
2644
            If from_backing_knit is False, index will be self, otherwise it
 
2645
            will be a version id.
2264
2646
        """
2265
2647
        try:
2266
2648
            start, end = self._by_version[version_id][1]
2273
2655
        """Get all the versions in the stream."""
2274
2656
        return self._by_version.keys()
2275
2657
 
2276
 
    def iter_parents(self, version_ids):
2277
 
        """Iterate through the parents for many version ids.
2278
 
 
2279
 
        :param version_ids: An iterable yielding version_ids.
2280
 
        :return: An iterator that yields (version_id, parents). Requested 
2281
 
            version_ids not present in the versioned file are simply skipped.
2282
 
            The order is undefined, allowing for different optimisations in
2283
 
            the underlying implementation.
2284
 
        """
2285
 
        result = []
2286
 
        for version in version_ids:
2287
 
            try:
2288
 
                result.append((version, self._by_version[version][2]))
2289
 
            except KeyError:
2290
 
                pass
2291
 
        return result
2292
 
 
2293
2658
 
2294
2659
class _KnitData(object):
2295
2660
    """Manage extraction of data from a KnitAccess, caching and decompressing.
2307
2672
        """
2308
2673
        self._access = access
2309
2674
        self._checked = False
2310
 
        # TODO: jam 20060713 conceptually, this could spill to disk
2311
 
        #       if the cached size gets larger than a certain amount
2312
 
        #       but it complicates the model a bit, so for now just use
2313
 
        #       a simple dictionary
2314
 
        self._cache = {}
2315
 
        self._do_cache = False
2316
 
 
2317
 
    def enable_cache(self):
2318
 
        """Enable caching of reads."""
2319
 
        self._do_cache = True
2320
 
 
2321
 
    def clear_cache(self):
2322
 
        """Clear the record cache."""
2323
 
        self._do_cache = False
2324
 
        self._cache = {}
2325
2675
 
2326
2676
    def _open_file(self):
2327
2677
        return self._access.open_file()
2346
2696
                                     digest)],
2347
2697
            dense_lines or lines,
2348
2698
            ["end %s\n" % version_id]))
2349
 
        assert bytes.__class__ == str
 
2699
        if lines and lines[-1][-1] != '\n':
 
2700
            raise ValueError('corrupt lines value %r' % lines)
2350
2701
        compressed_bytes = bytes_to_gzip(bytes)
2351
2702
        return len(compressed_bytes), compressed_bytes
2352
2703
 
2376
2727
                              % (version_id, e.__class__.__name__, str(e)))
2377
2728
        return df, rec
2378
2729
 
2379
 
    def _check_header(self, version_id, line):
 
2730
    def _split_header(self, line):
2380
2731
        rec = line.split()
2381
2732
        if len(rec) != 4:
2382
2733
            raise KnitCorrupt(self._access,
2383
2734
                              'unexpected number of elements in record header')
 
2735
        return rec
 
2736
 
 
2737
    def _check_header_version(self, rec, version_id):
2384
2738
        if rec[1] != version_id:
2385
2739
            raise KnitCorrupt(self._access,
2386
2740
                              'unexpected version, wanted %r, got %r'
2387
2741
                              % (version_id, rec[1]))
 
2742
 
 
2743
    def _check_header(self, version_id, line):
 
2744
        rec = self._split_header(line)
 
2745
        self._check_header_version(rec, version_id)
2388
2746
        return rec
2389
2747
 
2390
 
    def _parse_record(self, version_id, data):
 
2748
    def _parse_record_unchecked(self, data):
2391
2749
        # profiling notes:
2392
2750
        # 4168 calls in 2880 217 internal
2393
2751
        # 4168 calls to _parse_record_header in 2121
2394
2752
        # 4168 calls to readlines in 330
2395
2753
        df = GzipFile(mode='rb', fileobj=StringIO(data))
2396
 
 
2397
2754
        try:
2398
2755
            record_contents = df.readlines()
2399
2756
        except Exception, e:
2400
 
            raise KnitCorrupt(self._access,
2401
 
                              "While reading {%s} got %s(%s)"
2402
 
                              % (version_id, e.__class__.__name__, str(e)))
 
2757
            raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
 
2758
                (data, e.__class__.__name__, str(e)))
2403
2759
        header = record_contents.pop(0)
2404
 
        rec = self._check_header(version_id, header)
2405
 
 
 
2760
        rec = self._split_header(header)
2406
2761
        last_line = record_contents.pop()
2407
2762
        if len(record_contents) != int(rec[2]):
2408
2763
            raise KnitCorrupt(self._access,
2409
2764
                              'incorrect number of lines %s != %s'
2410
2765
                              ' for version {%s}'
2411
2766
                              % (len(record_contents), int(rec[2]),
2412
 
                                 version_id))
 
2767
                                 rec[1]))
2413
2768
        if last_line != 'end %s\n' % rec[1]:
2414
2769
            raise KnitCorrupt(self._access,
2415
2770
                              'unexpected version end line %r, wanted %r' 
2416
 
                              % (last_line, version_id))
 
2771
                              % (last_line, rec[1]))
2417
2772
        df.close()
 
2773
        return rec, record_contents
 
2774
 
 
2775
    def _parse_record(self, version_id, data):
 
2776
        rec, record_contents = self._parse_record_unchecked(data)
 
2777
        self._check_header_version(rec, version_id)
2418
2778
        return record_contents, rec[3]
2419
2779
 
2420
2780
    def read_records_iter_raw(self, records):
2422
2782
 
2423
2783
        This unpacks enough of the text record to validate the id is
2424
2784
        as expected but thats all.
 
2785
 
 
2786
        Each item the iterator yields is (version_id, bytes,
 
2787
        sha1_of_full_text).
2425
2788
        """
2426
2789
        # setup an iterator of the external records:
2427
2790
        # uses readv so nice and fast we hope.
2428
2791
        if len(records):
2429
2792
            # grab the disk data needed.
2430
 
            if self._cache:
2431
 
                # Don't check _cache if it is empty
2432
 
                needed_offsets = [index_memo for version_id, index_memo
2433
 
                                              in records
2434
 
                                              if version_id not in self._cache]
2435
 
            else:
2436
 
                needed_offsets = [index_memo for version_id, index_memo
2437
 
                                               in records]
2438
 
 
 
2793
            needed_offsets = [index_memo for version_id, index_memo
 
2794
                                           in records]
2439
2795
            raw_records = self._access.get_raw_records(needed_offsets)
2440
2796
 
2441
2797
        for version_id, index_memo in records:
2442
 
            if version_id in self._cache:
2443
 
                # This data has already been validated
2444
 
                data = self._cache[version_id]
2445
 
            else:
2446
 
                data = raw_records.next()
2447
 
                if self._do_cache:
2448
 
                    self._cache[version_id] = data
2449
 
 
2450
 
                # validate the header
2451
 
                df, rec = self._parse_record_header(version_id, data)
2452
 
                df.close()
2453
 
            yield version_id, data
 
2798
            data = raw_records.next()
 
2799
            # validate the header
 
2800
            df, rec = self._parse_record_header(version_id, data)
 
2801
            df.close()
 
2802
            yield version_id, data, rec[3]
2454
2803
 
2455
2804
    def read_records_iter(self, records):
2456
2805
        """Read text records from data file and yield result.
2465
2814
        if not records:
2466
2815
            return
2467
2816
 
2468
 
        if self._cache:
2469
 
            # Skip records we have alread seen
2470
 
            yielded_records = set()
2471
 
            needed_records = set()
2472
 
            for record in records:
2473
 
                if record[0] in self._cache:
2474
 
                    if record[0] in yielded_records:
2475
 
                        continue
2476
 
                    yielded_records.add(record[0])
2477
 
                    data = self._cache[record[0]]
2478
 
                    content, digest = self._parse_record(record[0], data)
2479
 
                    yield (record[0], content, digest)
2480
 
                else:
2481
 
                    needed_records.add(record)
2482
 
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
2483
 
        else:
2484
 
            needed_records = sorted(set(records), key=operator.itemgetter(1))
2485
 
 
 
2817
        needed_records = sorted(set(records), key=operator.itemgetter(1))
2486
2818
        if not needed_records:
2487
2819
            return
2488
2820
 
2494
2826
        for (version_id, index_memo), data in \
2495
2827
                izip(iter(needed_records), raw_data):
2496
2828
            content, digest = self._parse_record(version_id, data)
2497
 
            if self._do_cache:
2498
 
                self._cache[version_id] = data
2499
2829
            yield version_id, content, digest
2500
2830
 
2501
2831
    def read_records(self, records):
2510
2840
class InterKnit(InterVersionedFile):
2511
2841
    """Optimised code paths for knit to knit operations."""
2512
2842
    
2513
 
    _matching_file_from_factory = KnitVersionedFile
2514
 
    _matching_file_to_factory = KnitVersionedFile
 
2843
    _matching_file_from_factory = staticmethod(make_file_knit)
 
2844
    _matching_file_to_factory = staticmethod(make_file_knit)
2515
2845
    
2516
2846
    @staticmethod
2517
2847
    def is_compatible(source, target):
2528
2858
        see join() for the parameter definitions.
2529
2859
        """
2530
2860
        version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2531
 
        graph = self.source.get_graph(version_ids)
2532
 
        order = topo_sort(graph.items())
 
2861
        # --- the below is factorable out with VersionedFile.join, but wait for
 
2862
        # VersionedFiles, it may all be simpler then.
 
2863
        graph = Graph(self.source)
 
2864
        search = graph._make_breadth_first_searcher(version_ids)
 
2865
        transitive_ids = set()
 
2866
        map(transitive_ids.update, list(search))
 
2867
        parent_map = self.source.get_parent_map(transitive_ids)
 
2868
        order = topo_sort(parent_map.items())
2533
2869
 
2534
2870
        def size_of_content(content):
2535
2871
            return sum(len(line) for line in content.text())
2561
2897
 
2562
2898
    def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2563
2899
        """See InterVersionedFile.join."""
2564
 
        assert isinstance(self.source, KnitVersionedFile)
2565
 
        assert isinstance(self.target, KnitVersionedFile)
2566
 
 
2567
2900
        # If the source and target are mismatched w.r.t. annotations vs
2568
2901
        # plain, the data needs to be converted accordingly
2569
2902
        if self.source.factory.annotated == self.target.factory.annotated:
2596
2929
    
2597
2930
            if not needed_versions:
2598
2931
                return 0
2599
 
            full_list = topo_sort(self.source.get_graph())
 
2932
            full_list = topo_sort(
 
2933
                self.source.get_parent_map(self.source.versions()))
2600
2934
    
2601
2935
            version_list = [i for i in full_list if (not self.target.has_version(i)
2602
2936
                            and i in needed_versions)]
2614
2948
                    # * already have it or
2615
2949
                    # * have it scheduled already
2616
2950
                    # otherwise we don't care
2617
 
                    assert (self.target.has_version(parent) or
 
2951
                    if not (self.target.has_version(parent) or
2618
2952
                            parent in copy_set or
2619
 
                            not self.source.has_version(parent))
 
2953
                            not self.source.has_version(parent)):
 
2954
                        raise AssertionError("problem joining parent %r "
 
2955
                            "from %r to %r"
 
2956
                            % (parent, self.source, self.target))
2620
2957
                index_memo = self.source._index.get_position(version_id)
2621
2958
                copy_queue_records.append((version_id, index_memo))
2622
2959
                copy_queue.append((version_id, options, parents))
2627
2964
            total = len(version_list)
2628
2965
            raw_datum = []
2629
2966
            raw_records = []
2630
 
            for (version_id, raw_data), \
 
2967
            for (version_id, raw_data, _), \
2631
2968
                (version_id2, options, parents) in \
2632
2969
                izip(self.source._data.read_records_iter_raw(copy_queue_records),
2633
2970
                     copy_queue):
2634
 
                assert version_id == version_id2, 'logic error, inconsistent results'
 
2971
                if not (version_id == version_id2):
 
2972
                    raise AssertionError('logic error, inconsistent results')
2635
2973
                count = count + 1
2636
2974
                pb.update("Joining knit", count, total)
2637
2975
                if converter:
2667
3005
    """Optimised code paths for weave to knit operations."""
2668
3006
    
2669
3007
    _matching_file_from_factory = bzrlib.weave.WeaveFile
2670
 
    _matching_file_to_factory = KnitVersionedFile
 
3008
    _matching_file_to_factory = staticmethod(make_file_knit)
2671
3009
    
2672
3010
    @staticmethod
2673
3011
    def is_compatible(source, target):
2680
3018
 
2681
3019
    def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2682
3020
        """See InterVersionedFile.join."""
2683
 
        assert isinstance(self.source, bzrlib.weave.Weave)
2684
 
        assert isinstance(self.target, KnitVersionedFile)
2685
 
 
2686
3021
        version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2687
3022
 
2688
3023
        if not version_ids:
2698
3033
    
2699
3034
            if not needed_versions:
2700
3035
                return 0
2701
 
            full_list = topo_sort(self.source.get_graph())
 
3036
            full_list = topo_sort(
 
3037
                self.source.get_parent_map(self.source.versions()))
2702
3038
    
2703
3039
            version_list = [i for i in full_list if (not self.target.has_version(i)
2704
3040
                            and i in needed_versions)]
2706
3042
            # do the join:
2707
3043
            count = 0
2708
3044
            total = len(version_list)
 
3045
            parent_map = self.source.get_parent_map(version_list)
2709
3046
            for version_id in version_list:
2710
3047
                pb.update("Converting to knit", count, total)
2711
 
                parents = self.source.get_parents(version_id)
 
3048
                parents = parent_map[version_id]
2712
3049
                # check that its will be a consistent copy:
2713
3050
                for parent in parents:
2714
3051
                    # if source has the parent, we must already have it
2715
 
                    assert (self.target.has_version(parent))
 
3052
                    if not self.target.has_version(parent):
 
3053
                        raise AssertionError("%r does not have parent %r"
 
3054
                            % (self.target, parent))
2716
3055
                self.target.add_lines(
2717
3056
                    version_id, parents, self.source.get_lines(version_id))
2718
3057
                count = count + 1
2735
3074
    It will work for knits with cached annotations, but this is not
2736
3075
    recommended.
2737
3076
    """
2738
 
    ancestry = knit.get_ancestry(revision_id)
2739
 
    fulltext = dict(zip(ancestry, knit.get_line_list(ancestry)))
2740
 
    annotations = {}
2741
 
    for candidate in ancestry:
2742
 
        if candidate in annotations:
2743
 
            continue
2744
 
        parents = knit.get_parents(candidate)
2745
 
        if len(parents) == 0:
2746
 
            blocks = None
2747
 
        elif knit._index.get_method(candidate) != 'line-delta':
2748
 
            blocks = None
 
3077
    annotator = _KnitAnnotator(knit)
 
3078
    return iter(annotator.annotate(revision_id))
 
3079
 
 
3080
 
 
3081
class _KnitAnnotator(object):
 
3082
    """Build up the annotations for a text."""
 
3083
 
 
3084
    def __init__(self, knit):
 
3085
        self._knit = knit
 
3086
 
 
3087
        # Content objects, differs from fulltexts because of how final newlines
 
3088
        # are treated by knits. the content objects here will always have a
 
3089
        # final newline
 
3090
        self._fulltext_contents = {}
 
3091
 
 
3092
        # Annotated lines of specific revisions
 
3093
        self._annotated_lines = {}
 
3094
 
 
3095
        # Track the raw data for nodes that we could not process yet.
 
3096
        # This maps the revision_id of the base to a list of children that will
 
3097
        # annotated from it.
 
3098
        self._pending_children = {}
 
3099
 
 
3100
        # Nodes which cannot be extracted
 
3101
        self._ghosts = set()
 
3102
 
 
3103
        # Track how many children this node has, so we know if we need to keep
 
3104
        # it
 
3105
        self._annotate_children = {}
 
3106
        self._compression_children = {}
 
3107
 
 
3108
        self._all_build_details = {}
 
3109
        # The children => parent revision_id graph
 
3110
        self._revision_id_graph = {}
 
3111
 
 
3112
        self._heads_provider = None
 
3113
 
 
3114
        self._nodes_to_keep_annotations = set()
 
3115
        self._generations_until_keep = 100
 
3116
 
 
3117
    def set_generations_until_keep(self, value):
 
3118
        """Set the number of generations before caching a node.
 
3119
 
 
3120
        Setting this to -1 will cache every merge node, setting this higher
 
3121
        will cache fewer nodes.
 
3122
        """
 
3123
        self._generations_until_keep = value
 
3124
 
 
3125
    def _add_fulltext_content(self, revision_id, content_obj):
 
3126
        self._fulltext_contents[revision_id] = content_obj
 
3127
        # TODO: jam 20080305 It might be good to check the sha1digest here
 
3128
        return content_obj.text()
 
3129
 
 
3130
    def _check_parents(self, child, nodes_to_annotate):
 
3131
        """Check if all parents have been processed.
 
3132
 
 
3133
        :param child: A tuple of (rev_id, parents, raw_content)
 
3134
        :param nodes_to_annotate: If child is ready, add it to
 
3135
            nodes_to_annotate, otherwise put it back in self._pending_children
 
3136
        """
 
3137
        for parent_id in child[1]:
 
3138
            if (parent_id not in self._annotated_lines):
 
3139
                # This parent is present, but another parent is missing
 
3140
                self._pending_children.setdefault(parent_id,
 
3141
                                                  []).append(child)
 
3142
                break
2749
3143
        else:
2750
 
            parent, sha1, noeol, delta = knit.get_delta(candidate)
2751
 
            blocks = KnitContent.get_line_delta_blocks(delta,
2752
 
                fulltext[parents[0]], fulltext[candidate])
2753
 
        annotations[candidate] = list(annotate.reannotate([annotations[p]
2754
 
            for p in parents], fulltext[candidate], candidate, blocks))
2755
 
    return iter(annotations[revision_id])
 
3144
            # This one is ready to be processed
 
3145
            nodes_to_annotate.append(child)
 
3146
 
 
3147
    def _add_annotation(self, revision_id, fulltext, parent_ids,
 
3148
                        left_matching_blocks=None):
 
3149
        """Add an annotation entry.
 
3150
 
 
3151
        All parents should already have been annotated.
 
3152
        :return: A list of children that now have their parents satisfied.
 
3153
        """
 
3154
        a = self._annotated_lines
 
3155
        annotated_parent_lines = [a[p] for p in parent_ids]
 
3156
        annotated_lines = list(annotate.reannotate(annotated_parent_lines,
 
3157
            fulltext, revision_id, left_matching_blocks,
 
3158
            heads_provider=self._get_heads_provider()))
 
3159
        self._annotated_lines[revision_id] = annotated_lines
 
3160
        for p in parent_ids:
 
3161
            ann_children = self._annotate_children[p]
 
3162
            ann_children.remove(revision_id)
 
3163
            if (not ann_children
 
3164
                and p not in self._nodes_to_keep_annotations):
 
3165
                del self._annotated_lines[p]
 
3166
                del self._all_build_details[p]
 
3167
                if p in self._fulltext_contents:
 
3168
                    del self._fulltext_contents[p]
 
3169
        # Now that we've added this one, see if there are any pending
 
3170
        # deltas to be done, certainly this parent is finished
 
3171
        nodes_to_annotate = []
 
3172
        for child in self._pending_children.pop(revision_id, []):
 
3173
            self._check_parents(child, nodes_to_annotate)
 
3174
        return nodes_to_annotate
 
3175
 
 
3176
    def _get_build_graph(self, revision_id):
 
3177
        """Get the graphs for building texts and annotations.
 
3178
 
 
3179
        The data you need for creating a full text may be different than the
 
3180
        data you need to annotate that text. (At a minimum, you need both
 
3181
        parents to create an annotation, but only need 1 parent to generate the
 
3182
        fulltext.)
 
3183
 
 
3184
        :return: A list of (revision_id, index_memo) records, suitable for
 
3185
            passing to read_records_iter to start reading in the raw data fro/
 
3186
            the pack file.
 
3187
        """
 
3188
        if revision_id in self._annotated_lines:
 
3189
            # Nothing to do
 
3190
            return []
 
3191
        pending = set([revision_id])
 
3192
        records = []
 
3193
        generation = 0
 
3194
        kept_generation = 0
 
3195
        while pending:
 
3196
            # get all pending nodes
 
3197
            generation += 1
 
3198
            this_iteration = pending
 
3199
            build_details = self._knit._index.get_build_details(this_iteration)
 
3200
            self._all_build_details.update(build_details)
 
3201
            # new_nodes = self._knit._index._get_entries(this_iteration)
 
3202
            pending = set()
 
3203
            for rev_id, details in build_details.iteritems():
 
3204
                (index_memo, compression_parent, parents,
 
3205
                 record_details) = details
 
3206
                self._revision_id_graph[rev_id] = parents
 
3207
                records.append((rev_id, index_memo))
 
3208
                # Do we actually need to check _annotated_lines?
 
3209
                pending.update(p for p in parents
 
3210
                                 if p not in self._all_build_details)
 
3211
                if compression_parent:
 
3212
                    self._compression_children.setdefault(compression_parent,
 
3213
                        []).append(rev_id)
 
3214
                if parents:
 
3215
                    for parent in parents:
 
3216
                        self._annotate_children.setdefault(parent,
 
3217
                            []).append(rev_id)
 
3218
                    num_gens = generation - kept_generation
 
3219
                    if ((num_gens >= self._generations_until_keep)
 
3220
                        and len(parents) > 1):
 
3221
                        kept_generation = generation
 
3222
                        self._nodes_to_keep_annotations.add(rev_id)
 
3223
 
 
3224
            missing_versions = this_iteration.difference(build_details.keys())
 
3225
            self._ghosts.update(missing_versions)
 
3226
            for missing_version in missing_versions:
 
3227
                # add a key, no parents
 
3228
                self._revision_id_graph[missing_version] = ()
 
3229
                pending.discard(missing_version) # don't look for it
 
3230
        if self._ghosts.intersection(self._compression_children):
 
3231
            raise KnitCorrupt(
 
3232
                "We cannot have nodes which have a ghost compression parent:\n"
 
3233
                "ghosts: %r\n"
 
3234
                "compression children: %r"
 
3235
                % (self._ghosts, self._compression_children))
 
3236
        # Cleanout anything that depends on a ghost so that we don't wait for
 
3237
        # the ghost to show up
 
3238
        for node in self._ghosts:
 
3239
            if node in self._annotate_children:
 
3240
                # We won't be building this node
 
3241
                del self._annotate_children[node]
 
3242
        # Generally we will want to read the records in reverse order, because
 
3243
        # we find the parent nodes after the children
 
3244
        records.reverse()
 
3245
        return records
 
3246
 
 
3247
    def _annotate_records(self, records):
 
3248
        """Build the annotations for the listed records."""
 
3249
        # We iterate in the order read, rather than a strict order requested
 
3250
        # However, process what we can, and put off to the side things that
 
3251
        # still need parents, cleaning them up when those parents are
 
3252
        # processed.
 
3253
        for (rev_id, record,
 
3254
             digest) in self._knit._data.read_records_iter(records):
 
3255
            if rev_id in self._annotated_lines:
 
3256
                continue
 
3257
            parent_ids = self._revision_id_graph[rev_id]
 
3258
            parent_ids = [p for p in parent_ids if p not in self._ghosts]
 
3259
            details = self._all_build_details[rev_id]
 
3260
            (index_memo, compression_parent, parents,
 
3261
             record_details) = details
 
3262
            nodes_to_annotate = []
 
3263
            # TODO: Remove the punning between compression parents, and
 
3264
            #       parent_ids, we should be able to do this without assuming
 
3265
            #       the build order
 
3266
            if len(parent_ids) == 0:
 
3267
                # There are no parents for this node, so just add it
 
3268
                # TODO: This probably needs to be decoupled
 
3269
                fulltext_content, delta = self._knit.factory.parse_record(
 
3270
                    rev_id, record, record_details, None)
 
3271
                fulltext = self._add_fulltext_content(rev_id, fulltext_content)
 
3272
                nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
 
3273
                    parent_ids, left_matching_blocks=None))
 
3274
            else:
 
3275
                child = (rev_id, parent_ids, record)
 
3276
                # Check if all the parents are present
 
3277
                self._check_parents(child, nodes_to_annotate)
 
3278
            while nodes_to_annotate:
 
3279
                # Should we use a queue here instead of a stack?
 
3280
                (rev_id, parent_ids, record) = nodes_to_annotate.pop()
 
3281
                (index_memo, compression_parent, parents,
 
3282
                 record_details) = self._all_build_details[rev_id]
 
3283
                if compression_parent is not None:
 
3284
                    comp_children = self._compression_children[compression_parent]
 
3285
                    if rev_id not in comp_children:
 
3286
                        raise AssertionError("%r not in compression children %r"
 
3287
                            % (rev_id, comp_children))
 
3288
                    # If there is only 1 child, it is safe to reuse this
 
3289
                    # content
 
3290
                    reuse_content = (len(comp_children) == 1
 
3291
                        and compression_parent not in
 
3292
                            self._nodes_to_keep_annotations)
 
3293
                    if reuse_content:
 
3294
                        # Remove it from the cache since it will be changing
 
3295
                        parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
 
3296
                        # Make sure to copy the fulltext since it might be
 
3297
                        # modified
 
3298
                        parent_fulltext = list(parent_fulltext_content.text())
 
3299
                    else:
 
3300
                        parent_fulltext_content = self._fulltext_contents[compression_parent]
 
3301
                        parent_fulltext = parent_fulltext_content.text()
 
3302
                    comp_children.remove(rev_id)
 
3303
                    fulltext_content, delta = self._knit.factory.parse_record(
 
3304
                        rev_id, record, record_details,
 
3305
                        parent_fulltext_content,
 
3306
                        copy_base_content=(not reuse_content))
 
3307
                    fulltext = self._add_fulltext_content(rev_id,
 
3308
                                                          fulltext_content)
 
3309
                    blocks = KnitContent.get_line_delta_blocks(delta,
 
3310
                            parent_fulltext, fulltext)
 
3311
                else:
 
3312
                    fulltext_content = self._knit.factory.parse_fulltext(
 
3313
                        record, rev_id)
 
3314
                    fulltext = self._add_fulltext_content(rev_id,
 
3315
                        fulltext_content)
 
3316
                    blocks = None
 
3317
                nodes_to_annotate.extend(
 
3318
                    self._add_annotation(rev_id, fulltext, parent_ids,
 
3319
                                     left_matching_blocks=blocks))
 
3320
 
 
3321
    def _get_heads_provider(self):
 
3322
        """Create a heads provider for resolving ancestry issues."""
 
3323
        if self._heads_provider is not None:
 
3324
            return self._heads_provider
 
3325
        parent_provider = _mod_graph.DictParentsProvider(
 
3326
            self._revision_id_graph)
 
3327
        graph_obj = _mod_graph.Graph(parent_provider)
 
3328
        head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
 
3329
        self._heads_provider = head_cache
 
3330
        return head_cache
 
3331
 
 
3332
    def annotate(self, revision_id):
 
3333
        """Return the annotated fulltext at the given revision.
 
3334
 
 
3335
        :param revision_id: The revision id for this file
 
3336
        """
 
3337
        records = self._get_build_graph(revision_id)
 
3338
        if revision_id in self._ghosts:
 
3339
            raise errors.RevisionNotPresent(revision_id, self._knit)
 
3340
        self._annotate_records(records)
 
3341
        return self._annotated_lines[revision_id]
2756
3342
 
2757
3343
 
2758
3344
try: