/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/knit.py

  • Committer: Jelmer Vernooij
  • Date: 2019-02-17 03:45:31 UTC
  • mto: (7290.1.6 work)
  • mto: This revision was merged to the branch mainline in revision 7295.
  • Revision ID: jelmer@jelmer.uk-20190217034531-vw7oc2bo5hdd41jn
Drop documentation about removed pkgimport.conf.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Knit versionedfile implementation.
 
18
 
 
19
A knit is a versioned file implementation that supports efficient append only
 
20
updates.
 
21
 
 
22
Knit file layout:
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
 
26
end-marker; simply "end VERSION"
 
27
 
 
28
delta can be line or full contents.a
 
29
... the 8's there are the index number of the annotation.
 
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
 
31
59,59,3
 
32
8
 
33
8         if ie.executable:
 
34
8             e.set('executable', 'yes')
 
35
130,130,2
 
36
8         if elt.get('executable') == 'yes':
 
37
8             ie.executable = True
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
 
39
 
 
40
 
 
41
whats in an index:
 
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
 
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
 
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
 
45
09:33 < lifeless> right
 
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
 
47
 
 
48
 
 
49
so the index sequence is the dictionary compressed sequence number used
 
50
in the deltas to provide line annotation
 
51
 
 
52
"""
 
53
 
 
54
from __future__ import absolute_import
 
55
 
 
56
import operator
 
57
import os
 
58
 
 
59
from ..lazy_import import lazy_import
 
60
lazy_import(globals(), """
 
61
import gzip
 
62
 
 
63
from breezy import (
 
64
    debug,
 
65
    diff,
 
66
    patiencediff,
 
67
    static_tuple,
 
68
    trace,
 
69
    tsort,
 
70
    tuned_gzip,
 
71
    ui,
 
72
    )
 
73
from breezy.bzr import (
 
74
    index as _mod_index,
 
75
    pack,
 
76
    )
 
77
 
 
78
from breezy.bzr import pack_repo
 
79
from breezy.i18n import gettext
 
80
""")
 
81
from .. import (
 
82
    annotate,
 
83
    errors,
 
84
    osutils,
 
85
    )
 
86
from ..errors import (
 
87
    InternalBzrError,
 
88
    InvalidRevisionId,
 
89
    NoSuchFile,
 
90
    RevisionNotPresent,
 
91
    )
 
92
from ..osutils import (
 
93
    contains_whitespace,
 
94
    sha_string,
 
95
    sha_strings,
 
96
    split_lines,
 
97
    )
 
98
from ..sixish import (
 
99
    BytesIO,
 
100
    range,
 
101
    viewitems,
 
102
    viewvalues,
 
103
    )
 
104
from ..bzr.versionedfile import (
 
105
    _KeyRefs,
 
106
    AbsentContentFactory,
 
107
    adapter_registry,
 
108
    ConstantMapper,
 
109
    ContentFactory,
 
110
    sort_groupcompress,
 
111
    VersionedFilesWithFallbacks,
 
112
    )
 
113
 
 
114
 
 
115
# TODO: Split out code specific to this format into an associated object.
 
116
 
 
117
# TODO: Can we put in some kind of value to check that the index and data
 
118
# files belong together?
 
119
 
 
120
# TODO: accommodate binaries, perhaps by storing a byte count
 
121
 
 
122
# TODO: function to check whole file
 
123
 
 
124
# TODO: atomically append data, then measure backwards from the cursor
 
125
# position after writing to work out where it was located.  we may need to
 
126
# bypass python file buffering.
 
127
 
 
128
DATA_SUFFIX = '.knit'
 
129
INDEX_SUFFIX = '.kndx'
 
130
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
 
131
 
 
132
 
 
133
class KnitError(InternalBzrError):
 
134
 
 
135
    _fmt = "Knit error"
 
136
 
 
137
 
 
138
class KnitCorrupt(KnitError):
 
139
 
 
140
    _fmt = "Knit %(filename)s corrupt: %(how)s"
 
141
 
 
142
    def __init__(self, filename, how):
 
143
        KnitError.__init__(self)
 
144
        self.filename = filename
 
145
        self.how = how
 
146
 
 
147
 
 
148
class SHA1KnitCorrupt(KnitCorrupt):
 
149
 
 
150
    _fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
 
151
            "match expected sha-1. key %(key)s expected sha %(expected)s actual "
 
152
            "sha %(actual)s")
 
153
 
 
154
    def __init__(self, filename, actual, expected, key, content):
 
155
        KnitError.__init__(self)
 
156
        self.filename = filename
 
157
        self.actual = actual
 
158
        self.expected = expected
 
159
        self.key = key
 
160
        self.content = content
 
161
 
 
162
 
 
163
class KnitDataStreamIncompatible(KnitError):
 
164
    # Not raised anymore, as we can convert data streams.  In future we may
 
165
    # need it again for more exotic cases, so we're keeping it around for now.
 
166
 
 
167
    _fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
 
168
 
 
169
    def __init__(self, stream_format, target_format):
 
170
        self.stream_format = stream_format
 
171
        self.target_format = target_format
 
172
 
 
173
 
 
174
class KnitDataStreamUnknown(KnitError):
 
175
    # Indicates a data stream we don't know how to handle.
 
176
 
 
177
    _fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
 
178
 
 
179
    def __init__(self, stream_format):
 
180
        self.stream_format = stream_format
 
181
 
 
182
 
 
183
class KnitHeaderError(KnitError):
 
184
 
 
185
    _fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
 
186
 
 
187
    def __init__(self, badline, filename):
 
188
        KnitError.__init__(self)
 
189
        self.badline = badline
 
190
        self.filename = filename
 
191
 
 
192
 
 
193
class KnitIndexUnknownMethod(KnitError):
 
194
    """Raised when we don't understand the storage method.
 
195
 
 
196
    Currently only 'fulltext' and 'line-delta' are supported.
 
197
    """
 
198
 
 
199
    _fmt = ("Knit index %(filename)s does not have a known method"
 
200
            " in options: %(options)r")
 
201
 
 
202
    def __init__(self, filename, options):
 
203
        KnitError.__init__(self)
 
204
        self.filename = filename
 
205
        self.options = options
 
206
 
 
207
 
 
208
class KnitAdapter(object):
 
209
    """Base class for knit record adaption."""
 
210
 
 
211
    def __init__(self, basis_vf):
 
212
        """Create an adapter which accesses full texts from basis_vf.
 
213
 
 
214
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
215
            May be None for adapters that do not need to access basis texts.
 
216
        """
 
217
        self._data = KnitVersionedFiles(None, None)
 
218
        self._annotate_factory = KnitAnnotateFactory()
 
219
        self._plain_factory = KnitPlainFactory()
 
220
        self._basis_vf = basis_vf
 
221
 
 
222
 
 
223
class FTAnnotatedToUnannotated(KnitAdapter):
 
224
    """An adapter from FT annotated knits to unannotated ones."""
 
225
 
 
226
    def get_bytes(self, factory):
 
227
        annotated_compressed_bytes = factory._raw_record
 
228
        rec, contents = \
 
229
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
230
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
231
        size, bytes = self._data._record_to_data(
 
232
            (rec[1],), rec[3], content.text())
 
233
        return bytes
 
234
 
 
235
 
 
236
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
237
    """An adapter for deltas from annotated to unannotated."""
 
238
 
 
239
    def get_bytes(self, factory):
 
240
        annotated_compressed_bytes = factory._raw_record
 
241
        rec, contents = \
 
242
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
243
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
244
                                                        plain=True)
 
245
        contents = self._plain_factory.lower_line_delta(delta)
 
246
        size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
 
247
        return bytes
 
248
 
 
249
 
 
250
class FTAnnotatedToFullText(KnitAdapter):
 
251
    """An adapter from FT annotated knits to unannotated ones."""
 
252
 
 
253
    def get_bytes(self, factory):
 
254
        annotated_compressed_bytes = factory._raw_record
 
255
        rec, contents = \
 
256
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
257
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
 
258
                                                             contents, factory._build_details, None)
 
259
        return b''.join(content.text())
 
260
 
 
261
 
 
262
class DeltaAnnotatedToFullText(KnitAdapter):
 
263
    """An adapter for deltas from annotated to unannotated."""
 
264
 
 
265
    def get_bytes(self, factory):
 
266
        annotated_compressed_bytes = factory._raw_record
 
267
        rec, contents = \
 
268
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
269
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
270
                                                        plain=True)
 
271
        compression_parent = factory.parents[0]
 
272
        basis_entry = next(self._basis_vf.get_record_stream(
 
273
            [compression_parent], 'unordered', True))
 
274
        if basis_entry.storage_kind == 'absent':
 
275
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
276
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
277
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
278
        # Manually apply the delta because we have one annotated content and
 
279
        # one plain.
 
280
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
281
        basis_content.apply_delta(delta, rec[1])
 
282
        basis_content._should_strip_eol = factory._build_details[1]
 
283
        return b''.join(basis_content.text())
 
284
 
 
285
 
 
286
class FTPlainToFullText(KnitAdapter):
 
287
    """An adapter from FT plain knits to unannotated ones."""
 
288
 
 
289
    def get_bytes(self, factory):
 
290
        compressed_bytes = factory._raw_record
 
291
        rec, contents = \
 
292
            self._data._parse_record_unchecked(compressed_bytes)
 
293
        content, delta = self._plain_factory.parse_record(factory.key[-1],
 
294
                                                          contents, factory._build_details, None)
 
295
        return b''.join(content.text())
 
296
 
 
297
 
 
298
class DeltaPlainToFullText(KnitAdapter):
 
299
    """An adapter for deltas from annotated to unannotated."""
 
300
 
 
301
    def get_bytes(self, factory):
 
302
        compressed_bytes = factory._raw_record
 
303
        rec, contents = \
 
304
            self._data._parse_record_unchecked(compressed_bytes)
 
305
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
306
        compression_parent = factory.parents[0]
 
307
        # XXX: string splitting overhead.
 
308
        basis_entry = next(self._basis_vf.get_record_stream(
 
309
            [compression_parent], 'unordered', True))
 
310
        if basis_entry.storage_kind == 'absent':
 
311
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
312
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
313
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
314
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
315
        # Manually apply the delta because we have one annotated content and
 
316
        # one plain.
 
317
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
318
                                                      factory._build_details, basis_content)
 
319
        return b''.join(content.text())
 
320
 
 
321
 
 
322
class KnitContentFactory(ContentFactory):
 
323
    """Content factory for streaming from knits.
 
324
 
 
325
    :seealso ContentFactory:
 
326
    """
 
327
 
 
328
    def __init__(self, key, parents, build_details, sha1, raw_record,
 
329
                 annotated, knit=None, network_bytes=None):
 
330
        """Create a KnitContentFactory for key.
 
331
 
 
332
        :param key: The key.
 
333
        :param parents: The parents.
 
334
        :param build_details: The build details as returned from
 
335
            get_build_details.
 
336
        :param sha1: The sha1 expected from the full text of this object.
 
337
        :param raw_record: The bytes of the knit data from disk.
 
338
        :param annotated: True if the raw data is annotated.
 
339
        :param network_bytes: None to calculate the network bytes on demand,
 
340
            not-none if they are already known.
 
341
        """
 
342
        ContentFactory.__init__(self)
 
343
        self.sha1 = sha1
 
344
        self.key = key
 
345
        self.parents = parents
 
346
        if build_details[0] == 'line-delta':
 
347
            kind = 'delta'
 
348
        else:
 
349
            kind = 'ft'
 
350
        if annotated:
 
351
            annotated_kind = 'annotated-'
 
352
        else:
 
353
            annotated_kind = ''
 
354
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
355
        self._raw_record = raw_record
 
356
        self._network_bytes = network_bytes
 
357
        self._build_details = build_details
 
358
        self._knit = knit
 
359
 
 
360
    def _create_network_bytes(self):
 
361
        """Create a fully serialised network version for transmission."""
 
362
        # storage_kind, key, parents, Noeol, raw_record
 
363
        key_bytes = b'\x00'.join(self.key)
 
364
        if self.parents is None:
 
365
            parent_bytes = b'None:'
 
366
        else:
 
367
            parent_bytes = b'\t'.join(b'\x00'.join(key)
 
368
                                      for key in self.parents)
 
369
        if self._build_details[1]:
 
370
            noeol = b'N'
 
371
        else:
 
372
            noeol = b' '
 
373
        network_bytes = b"%s\n%s\n%s\n%s%s" % (
 
374
            self.storage_kind.encode('ascii'), key_bytes,
 
375
            parent_bytes, noeol, self._raw_record)
 
376
        self._network_bytes = network_bytes
 
377
 
 
378
    def get_bytes_as(self, storage_kind):
 
379
        if storage_kind == self.storage_kind:
 
380
            if self._network_bytes is None:
 
381
                self._create_network_bytes()
 
382
            return self._network_bytes
 
383
        if ('-ft-' in self.storage_kind
 
384
                and storage_kind in ('chunked', 'fulltext')):
 
385
            adapter_key = (self.storage_kind, 'fulltext')
 
386
            adapter_factory = adapter_registry.get(adapter_key)
 
387
            adapter = adapter_factory(None)
 
388
            bytes = adapter.get_bytes(self)
 
389
            if storage_kind == 'chunked':
 
390
                return [bytes]
 
391
            else:
 
392
                return bytes
 
393
        if self._knit is not None:
 
394
            # Not redundant with direct conversion above - that only handles
 
395
            # fulltext cases.
 
396
            if storage_kind == 'chunked':
 
397
                return self._knit.get_lines(self.key[0])
 
398
            elif storage_kind == 'fulltext':
 
399
                return self._knit.get_text(self.key[0])
 
400
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
401
                                               self.storage_kind)
 
402
 
 
403
 
 
404
class LazyKnitContentFactory(ContentFactory):
 
405
    """A ContentFactory which can either generate full text or a wire form.
 
406
 
 
407
    :seealso ContentFactory:
 
408
    """
 
409
 
 
410
    def __init__(self, key, parents, generator, first):
 
411
        """Create a LazyKnitContentFactory.
 
412
 
 
413
        :param key: The key of the record.
 
414
        :param parents: The parents of the record.
 
415
        :param generator: A _ContentMapGenerator containing the record for this
 
416
            key.
 
417
        :param first: Is this the first content object returned from generator?
 
418
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
419
            knit-delta-closure-ref
 
420
        """
 
421
        self.key = key
 
422
        self.parents = parents
 
423
        self.sha1 = None
 
424
        self._generator = generator
 
425
        self.storage_kind = "knit-delta-closure"
 
426
        if not first:
 
427
            self.storage_kind = self.storage_kind + "-ref"
 
428
        self._first = first
 
429
 
 
430
    def get_bytes_as(self, storage_kind):
 
431
        if storage_kind == self.storage_kind:
 
432
            if self._first:
 
433
                return self._generator._wire_bytes()
 
434
            else:
 
435
                # all the keys etc are contained in the bytes returned in the
 
436
                # first record.
 
437
                return b''
 
438
        if storage_kind in ('chunked', 'fulltext'):
 
439
            chunks = self._generator._get_one_work(self.key).text()
 
440
            if storage_kind == 'chunked':
 
441
                return chunks
 
442
            else:
 
443
                return b''.join(chunks)
 
444
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
445
                                               self.storage_kind)
 
446
 
 
447
 
 
448
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
449
    """Convert a network record to a iterator over stream records.
 
450
 
 
451
    :param storage_kind: The storage kind of the record.
 
452
        Must be 'knit-delta-closure'.
 
453
    :param bytes: The bytes of the record on the network.
 
454
    """
 
455
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
456
    return generator.get_record_stream()
 
457
 
 
458
 
 
459
def knit_network_to_record(storage_kind, bytes, line_end):
 
460
    """Convert a network record to a record object.
 
461
 
 
462
    :param storage_kind: The storage kind of the record.
 
463
    :param bytes: The bytes of the record on the network.
 
464
    """
 
465
    start = line_end
 
466
    line_end = bytes.find(b'\n', start)
 
467
    key = tuple(bytes[start:line_end].split(b'\x00'))
 
468
    start = line_end + 1
 
469
    line_end = bytes.find(b'\n', start)
 
470
    parent_line = bytes[start:line_end]
 
471
    if parent_line == b'None:':
 
472
        parents = None
 
473
    else:
 
474
        parents = tuple(
 
475
            [tuple(segment.split(b'\x00')) for segment in parent_line.split(b'\t')
 
476
             if segment])
 
477
    start = line_end + 1
 
478
    noeol = bytes[start:start + 1] == b'N'
 
479
    if 'ft' in storage_kind:
 
480
        method = 'fulltext'
 
481
    else:
 
482
        method = 'line-delta'
 
483
    build_details = (method, noeol)
 
484
    start = start + 1
 
485
    raw_record = bytes[start:]
 
486
    annotated = 'annotated' in storage_kind
 
487
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
488
                               annotated, network_bytes=bytes)]
 
489
 
 
490
 
 
491
class KnitContent(object):
 
492
    """Content of a knit version to which deltas can be applied.
 
493
 
 
494
    This is always stored in memory as a list of lines with \\n at the end,
 
495
    plus a flag saying if the final ending is really there or not, because that
 
496
    corresponds to the on-disk knit representation.
 
497
    """
 
498
 
 
499
    def __init__(self):
 
500
        self._should_strip_eol = False
 
501
 
 
502
    def apply_delta(self, delta, new_version_id):
 
503
        """Apply delta to this object to become new_version_id."""
 
504
        raise NotImplementedError(self.apply_delta)
 
505
 
 
506
    def line_delta_iter(self, new_lines):
 
507
        """Generate line-based delta from this content to new_lines."""
 
508
        new_texts = new_lines.text()
 
509
        old_texts = self.text()
 
510
        s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
 
511
        for tag, i1, i2, j1, j2 in s.get_opcodes():
 
512
            if tag == 'equal':
 
513
                continue
 
514
            # ofrom, oto, length, data
 
515
            yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
 
516
 
 
517
    def line_delta(self, new_lines):
 
518
        return list(self.line_delta_iter(new_lines))
 
519
 
 
520
    @staticmethod
 
521
    def get_line_delta_blocks(knit_delta, source, target):
 
522
        """Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
 
523
        target_len = len(target)
 
524
        s_pos = 0
 
525
        t_pos = 0
 
526
        for s_begin, s_end, t_len, new_text in knit_delta:
 
527
            true_n = s_begin - s_pos
 
528
            n = true_n
 
529
            if n > 0:
 
530
                # knit deltas do not provide reliable info about whether the
 
531
                # last line of a file matches, due to eol handling.
 
532
                if source[s_pos + n - 1] != target[t_pos + n - 1]:
 
533
                    n -= 1
 
534
                if n > 0:
 
535
                    yield s_pos, t_pos, n
 
536
            t_pos += t_len + true_n
 
537
            s_pos = s_end
 
538
        n = target_len - t_pos
 
539
        if n > 0:
 
540
            if source[s_pos + n - 1] != target[t_pos + n - 1]:
 
541
                n -= 1
 
542
            if n > 0:
 
543
                yield s_pos, t_pos, n
 
544
        yield s_pos + (target_len - t_pos), target_len, 0
 
545
 
 
546
 
 
547
class AnnotatedKnitContent(KnitContent):
 
548
    """Annotated content."""
 
549
 
 
550
    def __init__(self, lines):
 
551
        KnitContent.__init__(self)
 
552
        self._lines = list(lines)
 
553
 
 
554
    def annotate(self):
 
555
        """Return a list of (origin, text) for each content line."""
 
556
        lines = self._lines[:]
 
557
        if self._should_strip_eol:
 
558
            origin, last_line = lines[-1]
 
559
            lines[-1] = (origin, last_line.rstrip(b'\n'))
 
560
        return lines
 
561
 
 
562
    def apply_delta(self, delta, new_version_id):
 
563
        """Apply delta to this object to become new_version_id."""
 
564
        offset = 0
 
565
        lines = self._lines
 
566
        for start, end, count, delta_lines in delta:
 
567
            lines[offset + start:offset + end] = delta_lines
 
568
            offset = offset + (start - end) + count
 
569
 
 
570
    def text(self):
 
571
        try:
 
572
            lines = [text for origin, text in self._lines]
 
573
        except ValueError as e:
 
574
            # most commonly (only?) caused by the internal form of the knit
 
575
            # missing annotation information because of a bug - see thread
 
576
            # around 20071015
 
577
            raise KnitCorrupt(self,
 
578
                              "line in annotated knit missing annotation information: %s"
 
579
                              % (e,))
 
580
        if self._should_strip_eol:
 
581
            lines[-1] = lines[-1].rstrip(b'\n')
 
582
        return lines
 
583
 
 
584
    def copy(self):
 
585
        return AnnotatedKnitContent(self._lines)
 
586
 
 
587
 
 
588
class PlainKnitContent(KnitContent):
 
589
    """Unannotated content.
 
590
 
 
591
    When annotate[_iter] is called on this content, the same version is reported
 
592
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
 
593
    objects.
 
594
    """
 
595
 
 
596
    def __init__(self, lines, version_id):
 
597
        KnitContent.__init__(self)
 
598
        self._lines = lines
 
599
        self._version_id = version_id
 
600
 
 
601
    def annotate(self):
 
602
        """Return a list of (origin, text) for each content line."""
 
603
        return [(self._version_id, line) for line in self._lines]
 
604
 
 
605
    def apply_delta(self, delta, new_version_id):
 
606
        """Apply delta to this object to become new_version_id."""
 
607
        offset = 0
 
608
        lines = self._lines
 
609
        for start, end, count, delta_lines in delta:
 
610
            lines[offset + start:offset + end] = delta_lines
 
611
            offset = offset + (start - end) + count
 
612
        self._version_id = new_version_id
 
613
 
 
614
    def copy(self):
 
615
        return PlainKnitContent(self._lines[:], self._version_id)
 
616
 
 
617
    def text(self):
 
618
        lines = self._lines
 
619
        if self._should_strip_eol:
 
620
            lines = lines[:]
 
621
            lines[-1] = lines[-1].rstrip(b'\n')
 
622
        return lines
 
623
 
 
624
 
 
625
class _KnitFactory(object):
 
626
    """Base class for common Factory functions."""
 
627
 
 
628
    def parse_record(self, version_id, record, record_details,
 
629
                     base_content, copy_base_content=True):
 
630
        """Parse a record into a full content object.
 
631
 
 
632
        :param version_id: The official version id for this content
 
633
        :param record: The data returned by read_records_iter()
 
634
        :param record_details: Details about the record returned by
 
635
            get_build_details
 
636
        :param base_content: If get_build_details returns a compression_parent,
 
637
            you must return a base_content here, else use None
 
638
        :param copy_base_content: When building from the base_content, decide
 
639
            you can either copy it and return a new object, or modify it in
 
640
            place.
 
641
        :return: (content, delta) A Content object and possibly a line-delta,
 
642
            delta may be None
 
643
        """
 
644
        method, noeol = record_details
 
645
        if method == 'line-delta':
 
646
            if copy_base_content:
 
647
                content = base_content.copy()
 
648
            else:
 
649
                content = base_content
 
650
            delta = self.parse_line_delta(record, version_id)
 
651
            content.apply_delta(delta, version_id)
 
652
        else:
 
653
            content = self.parse_fulltext(record, version_id)
 
654
            delta = None
 
655
        content._should_strip_eol = noeol
 
656
        return (content, delta)
 
657
 
 
658
 
 
659
class KnitAnnotateFactory(_KnitFactory):
 
660
    """Factory for creating annotated Content objects."""
 
661
 
 
662
    annotated = True
 
663
 
 
664
    def make(self, lines, version_id):
 
665
        num_lines = len(lines)
 
666
        return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
 
667
 
 
668
    def parse_fulltext(self, content, version_id):
 
669
        """Convert fulltext to internal representation
 
670
 
 
671
        fulltext content is of the format
 
672
        revid(utf8) plaintext\n
 
673
        internal representation is of the format:
 
674
        (revid, plaintext)
 
675
        """
 
676
        # TODO: jam 20070209 The tests expect this to be returned as tuples,
 
677
        #       but the code itself doesn't really depend on that.
 
678
        #       Figure out a way to not require the overhead of turning the
 
679
        #       list back into tuples.
 
680
        lines = (tuple(line.split(b' ', 1)) for line in content)
 
681
        return AnnotatedKnitContent(lines)
 
682
 
 
683
    def parse_line_delta_iter(self, lines):
 
684
        return iter(self.parse_line_delta(lines))
 
685
 
 
686
    def parse_line_delta(self, lines, version_id, plain=False):
 
687
        """Convert a line based delta into internal representation.
 
688
 
 
689
        line delta is in the form of:
 
690
        intstart intend intcount
 
691
        1..count lines:
 
692
        revid(utf8) newline\n
 
693
        internal representation is
 
694
        (start, end, count, [1..count tuples (revid, newline)])
 
695
 
 
696
        :param plain: If True, the lines are returned as a plain
 
697
            list without annotations, not as a list of (origin, content) tuples, i.e.
 
698
            (start, end, count, [1..count newline])
 
699
        """
 
700
        result = []
 
701
        lines = iter(lines)
 
702
 
 
703
        cache = {}
 
704
 
 
705
        def cache_and_return(line):
 
706
            origin, text = line.split(b' ', 1)
 
707
            return cache.setdefault(origin, origin), text
 
708
 
 
709
        # walk through the lines parsing.
 
710
        # Note that the plain test is explicitly pulled out of the
 
711
        # loop to minimise any performance impact
 
712
        if plain:
 
713
            for header in lines:
 
714
                start, end, count = [int(n) for n in header.split(b',')]
 
715
                contents = [next(lines).split(b' ', 1)[1]
 
716
                            for _ in range(count)]
 
717
                result.append((start, end, count, contents))
 
718
        else:
 
719
            for header in lines:
 
720
                start, end, count = [int(n) for n in header.split(b',')]
 
721
                contents = [tuple(next(lines).split(b' ', 1))
 
722
                            for _ in range(count)]
 
723
                result.append((start, end, count, contents))
 
724
        return result
 
725
 
 
726
    def get_fulltext_content(self, lines):
 
727
        """Extract just the content lines from a fulltext."""
 
728
        return (line.split(b' ', 1)[1] for line in lines)
 
729
 
 
730
    def get_linedelta_content(self, lines):
 
731
        """Extract just the content from a line delta.
 
732
 
 
733
        This doesn't return all of the extra information stored in a delta.
 
734
        Only the actual content lines.
 
735
        """
 
736
        lines = iter(lines)
 
737
        for header in lines:
 
738
            header = header.split(b',')
 
739
            count = int(header[2])
 
740
            for _ in range(count):
 
741
                origin, text = next(lines).split(b' ', 1)
 
742
                yield text
 
743
 
 
744
    def lower_fulltext(self, content):
 
745
        """convert a fulltext content record into a serializable form.
 
746
 
 
747
        see parse_fulltext which this inverts.
 
748
        """
 
749
        return [b'%s %s' % (o, t) for o, t in content._lines]
 
750
 
 
751
    def lower_line_delta(self, delta):
 
752
        """convert a delta into a serializable form.
 
753
 
 
754
        See parse_line_delta which this inverts.
 
755
        """
 
756
        # TODO: jam 20070209 We only do the caching thing to make sure that
 
757
        #       the origin is a valid utf-8 line, eventually we could remove it
 
758
        out = []
 
759
        for start, end, c, lines in delta:
 
760
            out.append(b'%d,%d,%d\n' % (start, end, c))
 
761
            out.extend(origin + b' ' + text
 
762
                       for origin, text in lines)
 
763
        return out
 
764
 
 
765
    def annotate(self, knit, key):
 
766
        content = knit._get_content(key)
 
767
        # adjust for the fact that serialised annotations are only key suffixes
 
768
        # for this factory.
 
769
        if isinstance(key, tuple):
 
770
            prefix = key[:-1]
 
771
            origins = content.annotate()
 
772
            result = []
 
773
            for origin, line in origins:
 
774
                result.append((prefix + (origin,), line))
 
775
            return result
 
776
        else:
 
777
            # XXX: This smells a bit.  Why would key ever be a non-tuple here?
 
778
            # Aren't keys defined to be tuples?  -- spiv 20080618
 
779
            return content.annotate()
 
780
 
 
781
 
 
782
class KnitPlainFactory(_KnitFactory):
 
783
    """Factory for creating plain Content objects."""
 
784
 
 
785
    annotated = False
 
786
 
 
787
    def make(self, lines, version_id):
 
788
        return PlainKnitContent(lines, version_id)
 
789
 
 
790
    def parse_fulltext(self, content, version_id):
 
791
        """This parses an unannotated fulltext.
 
792
 
 
793
        Note that this is not a noop - the internal representation
 
794
        has (versionid, line) - its just a constant versionid.
 
795
        """
 
796
        return self.make(content, version_id)
 
797
 
 
798
    def parse_line_delta_iter(self, lines, version_id):
 
799
        cur = 0
 
800
        num_lines = len(lines)
 
801
        while cur < num_lines:
 
802
            header = lines[cur]
 
803
            cur += 1
 
804
            start, end, c = [int(n) for n in header.split(b',')]
 
805
            yield start, end, c, lines[cur:cur + c]
 
806
            cur += c
 
807
 
 
808
    def parse_line_delta(self, lines, version_id):
 
809
        return list(self.parse_line_delta_iter(lines, version_id))
 
810
 
 
811
    def get_fulltext_content(self, lines):
 
812
        """Extract just the content lines from a fulltext."""
 
813
        return iter(lines)
 
814
 
 
815
    def get_linedelta_content(self, lines):
 
816
        """Extract just the content from a line delta.
 
817
 
 
818
        This doesn't return all of the extra information stored in a delta.
 
819
        Only the actual content lines.
 
820
        """
 
821
        lines = iter(lines)
 
822
        for header in lines:
 
823
            header = header.split(b',')
 
824
            count = int(header[2])
 
825
            for _ in range(count):
 
826
                yield next(lines)
 
827
 
 
828
    def lower_fulltext(self, content):
 
829
        return content.text()
 
830
 
 
831
    def lower_line_delta(self, delta):
 
832
        out = []
 
833
        for start, end, c, lines in delta:
 
834
            out.append(b'%d,%d,%d\n' % (start, end, c))
 
835
            out.extend(lines)
 
836
        return out
 
837
 
 
838
    def annotate(self, knit, key):
 
839
        annotator = _KnitAnnotator(knit)
 
840
        return annotator.annotate_flat(key)
 
841
 
 
842
 
 
843
def make_file_factory(annotated, mapper):
 
844
    """Create a factory for creating a file based KnitVersionedFiles.
 
845
 
 
846
    This is only functional enough to run interface tests, it doesn't try to
 
847
    provide a full pack environment.
 
848
 
 
849
    :param annotated: knit annotations are wanted.
 
850
    :param mapper: The mapper from keys to paths.
 
851
    """
 
852
    def factory(transport):
 
853
        index = _KndxIndex(transport, mapper, lambda: None,
 
854
                           lambda: True, lambda: True)
 
855
        access = _KnitKeyAccess(transport, mapper)
 
856
        return KnitVersionedFiles(index, access, annotated=annotated)
 
857
    return factory
 
858
 
 
859
 
 
860
def make_pack_factory(graph, delta, keylength):
 
861
    """Create a factory for creating a pack based VersionedFiles.
 
862
 
 
863
    This is only functional enough to run interface tests, it doesn't try to
 
864
    provide a full pack environment.
 
865
 
 
866
    :param graph: Store a graph.
 
867
    :param delta: Delta compress contents.
 
868
    :param keylength: How long should keys be.
 
869
    """
 
870
    def factory(transport):
 
871
        parents = graph or delta
 
872
        ref_length = 0
 
873
        if graph:
 
874
            ref_length += 1
 
875
        if delta:
 
876
            ref_length += 1
 
877
            max_delta_chain = 200
 
878
        else:
 
879
            max_delta_chain = 0
 
880
        graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
 
881
                                                    key_elements=keylength)
 
882
        stream = transport.open_write_stream('newpack')
 
883
        writer = pack.ContainerWriter(stream.write)
 
884
        writer.begin()
 
885
        index = _KnitGraphIndex(graph_index, lambda: True, parents=parents,
 
886
                                deltas=delta, add_callback=graph_index.add_nodes)
 
887
        access = pack_repo._DirectPackAccess({})
 
888
        access.set_writer(writer, graph_index, (transport, 'newpack'))
 
889
        result = KnitVersionedFiles(index, access,
 
890
                                    max_delta_chain=max_delta_chain)
 
891
        result.stream = stream
 
892
        result.writer = writer
 
893
        return result
 
894
    return factory
 
895
 
 
896
 
 
897
def cleanup_pack_knit(versioned_files):
 
898
    versioned_files.stream.close()
 
899
    versioned_files.writer.end()
 
900
 
 
901
 
 
902
def _get_total_build_size(self, keys, positions):
 
903
    """Determine the total bytes to build these keys.
 
904
 
 
905
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
906
    don't inherit from a common base.)
 
907
 
 
908
    :param keys: Keys that we want to build
 
909
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
910
        as returned by _get_components_positions)
 
911
    :return: Number of bytes to build those keys
 
912
    """
 
913
    all_build_index_memos = {}
 
914
    build_keys = keys
 
915
    while build_keys:
 
916
        next_keys = set()
 
917
        for key in build_keys:
 
918
            # This is mostly for the 'stacked' case
 
919
            # Where we will be getting the data from a fallback
 
920
            if key not in positions:
 
921
                continue
 
922
            _, index_memo, compression_parent = positions[key]
 
923
            all_build_index_memos[key] = index_memo
 
924
            if compression_parent not in all_build_index_memos:
 
925
                next_keys.add(compression_parent)
 
926
        build_keys = next_keys
 
927
    return sum(index_memo[2]
 
928
               for index_memo in viewvalues(all_build_index_memos))
 
929
 
 
930
 
 
931
class KnitVersionedFiles(VersionedFilesWithFallbacks):
 
932
    """Storage for many versioned files using knit compression.
 
933
 
 
934
    Backend storage is managed by indices and data objects.
 
935
 
 
936
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
937
        parents, graph, compression and data location of entries in this
 
938
        KnitVersionedFiles.  Note that this is only the index for
 
939
        *this* vfs; if there are fallbacks they must be queried separately.
 
940
    """
 
941
 
 
942
    def __init__(self, index, data_access, max_delta_chain=200,
 
943
                 annotated=False, reload_func=None):
 
944
        """Create a KnitVersionedFiles with index and data_access.
 
945
 
 
946
        :param index: The index for the knit data.
 
947
        :param data_access: The access object to store and retrieve knit
 
948
            records.
 
949
        :param max_delta_chain: The maximum number of deltas to permit during
 
950
            insertion. Set to 0 to prohibit the use of deltas.
 
951
        :param annotated: Set to True to cause annotations to be calculated and
 
952
            stored during insertion.
 
953
        :param reload_func: An function that can be called if we think we need
 
954
            to reload the pack listing and try again. See
 
955
            'breezy.bzr.pack_repo.AggregateIndex' for the signature.
 
956
        """
 
957
        self._index = index
 
958
        self._access = data_access
 
959
        self._max_delta_chain = max_delta_chain
 
960
        if annotated:
 
961
            self._factory = KnitAnnotateFactory()
 
962
        else:
 
963
            self._factory = KnitPlainFactory()
 
964
        self._immediate_fallback_vfs = []
 
965
        self._reload_func = reload_func
 
966
 
 
967
    def __repr__(self):
 
968
        return "%s(%r, %r)" % (
 
969
            self.__class__.__name__,
 
970
            self._index,
 
971
            self._access)
 
972
 
 
973
    def without_fallbacks(self):
 
974
        """Return a clone of this object without any fallbacks configured."""
 
975
        return KnitVersionedFiles(self._index, self._access,
 
976
                                  self._max_delta_chain, self._factory.annotated,
 
977
                                  self._reload_func)
 
978
 
 
979
    def add_fallback_versioned_files(self, a_versioned_files):
 
980
        """Add a source of texts for texts not present in this knit.
 
981
 
 
982
        :param a_versioned_files: A VersionedFiles object.
 
983
        """
 
984
        self._immediate_fallback_vfs.append(a_versioned_files)
 
985
 
 
986
    def add_lines(self, key, parents, lines, parent_texts=None,
 
987
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
 
988
                  check_content=True):
 
989
        """See VersionedFiles.add_lines()."""
 
990
        self._index._check_write_ok()
 
991
        self._check_add(key, lines, random_id, check_content)
 
992
        if parents is None:
 
993
            # The caller might pass None if there is no graph data, but kndx
 
994
            # indexes can't directly store that, so we give them
 
995
            # an empty tuple instead.
 
996
            parents = ()
 
997
        line_bytes = b''.join(lines)
 
998
        return self._add(key, lines, parents,
 
999
                         parent_texts, left_matching_blocks, nostore_sha, random_id,
 
1000
                         line_bytes=line_bytes)
 
1001
 
 
1002
    def _add(self, key, lines, parents, parent_texts,
 
1003
             left_matching_blocks, nostore_sha, random_id,
 
1004
             line_bytes):
 
1005
        """Add a set of lines on top of version specified by parents.
 
1006
 
 
1007
        Any versions not present will be converted into ghosts.
 
1008
 
 
1009
        :param lines: A list of strings where each one is a single line (has a
 
1010
            single newline at the end of the string) This is now optional
 
1011
            (callers can pass None). It is left in its location for backwards
 
1012
            compatibility. It should ''.join(lines) must == line_bytes
 
1013
        :param line_bytes: A single string containing the content
 
1014
 
 
1015
        We pass both lines and line_bytes because different routes bring the
 
1016
        values to this function. And for memory efficiency, we don't want to
 
1017
        have to split/join on-demand.
 
1018
        """
 
1019
        # first thing, if the content is something we don't need to store, find
 
1020
        # that out.
 
1021
        digest = sha_string(line_bytes)
 
1022
        if nostore_sha == digest:
 
1023
            raise errors.ExistingContent
 
1024
 
 
1025
        present_parents = []
 
1026
        if parent_texts is None:
 
1027
            parent_texts = {}
 
1028
        # Do a single query to ascertain parent presence; we only compress
 
1029
        # against parents in the same kvf.
 
1030
        present_parent_map = self._index.get_parent_map(parents)
 
1031
        for parent in parents:
 
1032
            if parent in present_parent_map:
 
1033
                present_parents.append(parent)
 
1034
 
 
1035
        # Currently we can only compress against the left most present parent.
 
1036
        if (len(present_parents) == 0
 
1037
                or present_parents[0] != parents[0]):
 
1038
            delta = False
 
1039
        else:
 
1040
            # To speed the extract of texts the delta chain is limited
 
1041
            # to a fixed number of deltas.  This should minimize both
 
1042
            # I/O and the time spend applying deltas.
 
1043
            delta = self._check_should_delta(present_parents[0])
 
1044
 
 
1045
        text_length = len(line_bytes)
 
1046
        options = []
 
1047
        no_eol = False
 
1048
        # Note: line_bytes is not modified to add a newline, that is tracked
 
1049
        #       via the no_eol flag. 'lines' *is* modified, because that is the
 
1050
        #       general values needed by the Content code.
 
1051
        if line_bytes and not line_bytes.endswith(b'\n'):
 
1052
            options.append(b'no-eol')
 
1053
            no_eol = True
 
1054
            # Copy the existing list, or create a new one
 
1055
            if lines is None:
 
1056
                lines = osutils.split_lines(line_bytes)
 
1057
            else:
 
1058
                lines = lines[:]
 
1059
            # Replace the last line with one that ends in a final newline
 
1060
            lines[-1] = lines[-1] + b'\n'
 
1061
        if lines is None:
 
1062
            lines = osutils.split_lines(line_bytes)
 
1063
 
 
1064
        for element in key[:-1]:
 
1065
            if not isinstance(element, bytes):
 
1066
                raise TypeError("key contains non-bytestrings: %r" % (key,))
 
1067
        if key[-1] is None:
 
1068
            key = key[:-1] + (b'sha1:' + digest,)
 
1069
        elif not isinstance(key[-1], bytes):
 
1070
            raise TypeError("key contains non-bytestrings: %r" % (key,))
 
1071
        # Knit hunks are still last-element only
 
1072
        version_id = key[-1]
 
1073
        content = self._factory.make(lines, version_id)
 
1074
        if no_eol:
 
1075
            # Hint to the content object that its text() call should strip the
 
1076
            # EOL.
 
1077
            content._should_strip_eol = True
 
1078
        if delta or (self._factory.annotated and len(present_parents) > 0):
 
1079
            # Merge annotations from parent texts if needed.
 
1080
            delta_hunks = self._merge_annotations(content, present_parents,
 
1081
                                                  parent_texts, delta, self._factory.annotated,
 
1082
                                                  left_matching_blocks)
 
1083
 
 
1084
        if delta:
 
1085
            options.append(b'line-delta')
 
1086
            store_lines = self._factory.lower_line_delta(delta_hunks)
 
1087
            size, data = self._record_to_data(key, digest,
 
1088
                                              store_lines)
 
1089
        else:
 
1090
            options.append(b'fulltext')
 
1091
            # isinstance is slower and we have no hierarchy.
 
1092
            if self._factory.__class__ is KnitPlainFactory:
 
1093
                # Use the already joined bytes saving iteration time in
 
1094
                # _record_to_data.
 
1095
                dense_lines = [line_bytes]
 
1096
                if no_eol:
 
1097
                    dense_lines.append(b'\n')
 
1098
                size, data = self._record_to_data(key, digest,
 
1099
                                                  lines, dense_lines)
 
1100
            else:
 
1101
                # get mixed annotation + content and feed it into the
 
1102
                # serialiser.
 
1103
                store_lines = self._factory.lower_fulltext(content)
 
1104
                size, data = self._record_to_data(key, digest,
 
1105
                                                  store_lines)
 
1106
 
 
1107
        access_memo = self._access.add_raw_records([(key, size)], data)[0]
 
1108
        self._index.add_records(
 
1109
            ((key, options, access_memo, parents),),
 
1110
            random_id=random_id)
 
1111
        return digest, text_length, content
 
1112
 
 
1113
    def annotate(self, key):
 
1114
        """See VersionedFiles.annotate."""
 
1115
        return self._factory.annotate(self, key)
 
1116
 
 
1117
    def get_annotator(self):
 
1118
        return _KnitAnnotator(self)
 
1119
 
 
1120
    def check(self, progress_bar=None, keys=None):
 
1121
        """See VersionedFiles.check()."""
 
1122
        if keys is None:
 
1123
            return self._logical_check()
 
1124
        else:
 
1125
            # At the moment, check does not extra work over get_record_stream
 
1126
            return self.get_record_stream(keys, 'unordered', True)
 
1127
 
 
1128
    def _logical_check(self):
 
1129
        # This doesn't actually test extraction of everything, but that will
 
1130
        # impact 'bzr check' substantially, and needs to be integrated with
 
1131
        # care. However, it does check for the obvious problem of a delta with
 
1132
        # no basis.
 
1133
        keys = self._index.keys()
 
1134
        parent_map = self.get_parent_map(keys)
 
1135
        for key in keys:
 
1136
            if self._index.get_method(key) != 'fulltext':
 
1137
                compression_parent = parent_map[key][0]
 
1138
                if compression_parent not in parent_map:
 
1139
                    raise KnitCorrupt(self,
 
1140
                                      "Missing basis parent %s for %s" % (
 
1141
                                          compression_parent, key))
 
1142
        for fallback_vfs in self._immediate_fallback_vfs:
 
1143
            fallback_vfs.check()
 
1144
 
 
1145
    def _check_add(self, key, lines, random_id, check_content):
 
1146
        """check that version_id and lines are safe to add."""
 
1147
        if not all(isinstance(x, bytes) or x is None for x in key):
 
1148
            raise TypeError(key)
 
1149
        version_id = key[-1]
 
1150
        if version_id is not None:
 
1151
            if contains_whitespace(version_id):
 
1152
                raise InvalidRevisionId(version_id, self)
 
1153
            self.check_not_reserved_id(version_id)
 
1154
        # TODO: If random_id==False and the key is already present, we should
 
1155
        # probably check that the existing content is identical to what is
 
1156
        # being inserted, and otherwise raise an exception.  This would make
 
1157
        # the bundle code simpler.
 
1158
        if check_content:
 
1159
            self._check_lines_not_unicode(lines)
 
1160
            self._check_lines_are_lines(lines)
 
1161
 
 
1162
    def _check_header(self, key, line):
 
1163
        rec = self._split_header(line)
 
1164
        self._check_header_version(rec, key[-1])
 
1165
        return rec
 
1166
 
 
1167
    def _check_header_version(self, rec, version_id):
 
1168
        """Checks the header version on original format knit records.
 
1169
 
 
1170
        These have the last component of the key embedded in the record.
 
1171
        """
 
1172
        if rec[1] != version_id:
 
1173
            raise KnitCorrupt(self,
 
1174
                              'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
 
1175
 
 
1176
    def _check_should_delta(self, parent):
 
1177
        """Iterate back through the parent listing, looking for a fulltext.
 
1178
 
 
1179
        This is used when we want to decide whether to add a delta or a new
 
1180
        fulltext. It searches for _max_delta_chain parents. When it finds a
 
1181
        fulltext parent, it sees if the total size of the deltas leading up to
 
1182
        it is large enough to indicate that we want a new full text anyway.
 
1183
 
 
1184
        Return True if we should create a new delta, False if we should use a
 
1185
        full text.
 
1186
        """
 
1187
        delta_size = 0
 
1188
        fulltext_size = None
 
1189
        for count in range(self._max_delta_chain):
 
1190
            try:
 
1191
                # Note that this only looks in the index of this particular
 
1192
                # KnitVersionedFiles, not in the fallbacks.  This ensures that
 
1193
                # we won't store a delta spanning physical repository
 
1194
                # boundaries.
 
1195
                build_details = self._index.get_build_details([parent])
 
1196
                parent_details = build_details[parent]
 
1197
            except (RevisionNotPresent, KeyError) as e:
 
1198
                # Some basis is not locally present: always fulltext
 
1199
                return False
 
1200
            index_memo, compression_parent, _, _ = parent_details
 
1201
            _, _, size = index_memo
 
1202
            if compression_parent is None:
 
1203
                fulltext_size = size
 
1204
                break
 
1205
            delta_size += size
 
1206
            # We don't explicitly check for presence because this is in an
 
1207
            # inner loop, and if it's missing it'll fail anyhow.
 
1208
            parent = compression_parent
 
1209
        else:
 
1210
            # We couldn't find a fulltext, so we must create a new one
 
1211
            return False
 
1212
        # Simple heuristic - if the total I/O wold be greater as a delta than
 
1213
        # the originally installed fulltext, we create a new fulltext.
 
1214
        return fulltext_size > delta_size
 
1215
 
 
1216
    def _build_details_to_components(self, build_details):
 
1217
        """Convert a build_details tuple to a position tuple."""
 
1218
        # record_details, access_memo, compression_parent
 
1219
        return build_details[3], build_details[0], build_details[1]
 
1220
 
 
1221
    def _get_components_positions(self, keys, allow_missing=False):
 
1222
        """Produce a map of position data for the components of keys.
 
1223
 
 
1224
        This data is intended to be used for retrieving the knit records.
 
1225
 
 
1226
        A dict of key to (record_details, index_memo, next, parents) is
 
1227
        returned.
 
1228
 
 
1229
        * method is the way referenced data should be applied.
 
1230
        * index_memo is the handle to pass to the data access to actually get
 
1231
          the data
 
1232
        * next is the build-parent of the version, or None for fulltexts.
 
1233
        * parents is the version_ids of the parents of this version
 
1234
 
 
1235
        :param allow_missing: If True do not raise an error on a missing
 
1236
            component, just ignore it.
 
1237
        """
 
1238
        component_data = {}
 
1239
        pending_components = keys
 
1240
        while pending_components:
 
1241
            build_details = self._index.get_build_details(pending_components)
 
1242
            current_components = set(pending_components)
 
1243
            pending_components = set()
 
1244
            for key, details in viewitems(build_details):
 
1245
                (index_memo, compression_parent, parents,
 
1246
                 record_details) = details
 
1247
                if compression_parent is not None:
 
1248
                    pending_components.add(compression_parent)
 
1249
                component_data[key] = self._build_details_to_components(
 
1250
                    details)
 
1251
            missing = current_components.difference(build_details)
 
1252
            if missing and not allow_missing:
 
1253
                raise errors.RevisionNotPresent(missing.pop(), self)
 
1254
        return component_data
 
1255
 
 
1256
    def _get_content(self, key, parent_texts={}):
 
1257
        """Returns a content object that makes up the specified
 
1258
        version."""
 
1259
        cached_version = parent_texts.get(key, None)
 
1260
        if cached_version is not None:
 
1261
            # Ensure the cache dict is valid.
 
1262
            if not self.get_parent_map([key]):
 
1263
                raise RevisionNotPresent(key, self)
 
1264
            return cached_version
 
1265
        generator = _VFContentMapGenerator(self, [key])
 
1266
        return generator._get_content(key)
 
1267
 
 
1268
    def get_parent_map(self, keys):
 
1269
        """Get a map of the graph parents of keys.
 
1270
 
 
1271
        :param keys: The keys to look up parents for.
 
1272
        :return: A mapping from keys to parents. Absent keys are absent from
 
1273
            the mapping.
 
1274
        """
 
1275
        return self._get_parent_map_with_sources(keys)[0]
 
1276
 
 
1277
    def _get_parent_map_with_sources(self, keys):
 
1278
        """Get a map of the parents of keys.
 
1279
 
 
1280
        :param keys: The keys to look up parents for.
 
1281
        :return: A tuple. The first element is a mapping from keys to parents.
 
1282
            Absent keys are absent from the mapping. The second element is a
 
1283
            list with the locations each key was found in. The first element
 
1284
            is the in-this-knit parents, the second the first fallback source,
 
1285
            and so on.
 
1286
        """
 
1287
        result = {}
 
1288
        sources = [self._index] + self._immediate_fallback_vfs
 
1289
        source_results = []
 
1290
        missing = set(keys)
 
1291
        for source in sources:
 
1292
            if not missing:
 
1293
                break
 
1294
            new_result = source.get_parent_map(missing)
 
1295
            source_results.append(new_result)
 
1296
            result.update(new_result)
 
1297
            missing.difference_update(set(new_result))
 
1298
        return result, source_results
 
1299
 
 
1300
    def _get_record_map(self, keys, allow_missing=False):
 
1301
        """Produce a dictionary of knit records.
 
1302
 
 
1303
        :return: {key:(record, record_details, digest, next)}
 
1304
 
 
1305
            * record: data returned from read_records (a KnitContentobject)
 
1306
            * record_details: opaque information to pass to parse_record
 
1307
            * digest: SHA1 digest of the full text after all steps are done
 
1308
            * next: build-parent of the version, i.e. the leftmost ancestor.
 
1309
                Will be None if the record is not a delta.
 
1310
 
 
1311
        :param keys: The keys to build a map for
 
1312
        :param allow_missing: If some records are missing, rather than
 
1313
            error, just return the data that could be generated.
 
1314
        """
 
1315
        raw_map = self._get_record_map_unparsed(keys,
 
1316
                                                allow_missing=allow_missing)
 
1317
        return self._raw_map_to_record_map(raw_map)
 
1318
 
 
1319
    def _raw_map_to_record_map(self, raw_map):
 
1320
        """Parse the contents of _get_record_map_unparsed.
 
1321
 
 
1322
        :return: see _get_record_map.
 
1323
        """
 
1324
        result = {}
 
1325
        for key in raw_map:
 
1326
            data, record_details, next = raw_map[key]
 
1327
            content, digest = self._parse_record(key[-1], data)
 
1328
            result[key] = content, record_details, digest, next
 
1329
        return result
 
1330
 
 
1331
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1332
        """Get the raw data for reconstructing keys without parsing it.
 
1333
 
 
1334
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1335
            key-> raw_bytes, (method, noeol), compression_parent
 
1336
        """
 
1337
        # This retries the whole request if anything fails. Potentially we
 
1338
        # could be a bit more selective. We could track the keys whose records
 
1339
        # we have successfully found, and then only request the new records
 
1340
        # from there. However, _get_components_positions grabs the whole build
 
1341
        # chain, which means we'll likely try to grab the same records again
 
1342
        # anyway. Also, can the build chains change as part of a pack
 
1343
        # operation? We wouldn't want to end up with a broken chain.
 
1344
        while True:
 
1345
            try:
 
1346
                position_map = self._get_components_positions(keys,
 
1347
                                                              allow_missing=allow_missing)
 
1348
                # key = component_id, r = record_details, i_m = index_memo,
 
1349
                # n = next
 
1350
                records = [(key, i_m) for key, (r, i_m, n)
 
1351
                           in viewitems(position_map)]
 
1352
                # Sort by the index memo, so that we request records from the
 
1353
                # same pack file together, and in forward-sorted order
 
1354
                records.sort(key=operator.itemgetter(1))
 
1355
                raw_record_map = {}
 
1356
                for key, data in self._read_records_iter_unchecked(records):
 
1357
                    (record_details, index_memo, next) = position_map[key]
 
1358
                    raw_record_map[key] = data, record_details, next
 
1359
                return raw_record_map
 
1360
            except errors.RetryWithNewPacks as e:
 
1361
                self._access.reload_or_raise(e)
 
1362
 
 
1363
    @classmethod
 
1364
    def _split_by_prefix(cls, keys):
 
1365
        """For the given keys, split them up based on their prefix.
 
1366
 
 
1367
        To keep memory pressure somewhat under control, split the
 
1368
        requests back into per-file-id requests, otherwise "bzr co"
 
1369
        extracts the full tree into memory before writing it to disk.
 
1370
        This should be revisited if _get_content_maps() can ever cross
 
1371
        file-id boundaries.
 
1372
 
 
1373
        The keys for a given file_id are kept in the same relative order.
 
1374
        Ordering between file_ids is not, though prefix_order will return the
 
1375
        order that the key was first seen.
 
1376
 
 
1377
        :param keys: An iterable of key tuples
 
1378
        :return: (split_map, prefix_order)
 
1379
            split_map       A dictionary mapping prefix => keys
 
1380
            prefix_order    The order that we saw the various prefixes
 
1381
        """
 
1382
        split_by_prefix = {}
 
1383
        prefix_order = []
 
1384
        for key in keys:
 
1385
            if len(key) == 1:
 
1386
                prefix = b''
 
1387
            else:
 
1388
                prefix = key[0]
 
1389
 
 
1390
            if prefix in split_by_prefix:
 
1391
                split_by_prefix[prefix].append(key)
 
1392
            else:
 
1393
                split_by_prefix[prefix] = [key]
 
1394
                prefix_order.append(prefix)
 
1395
        return split_by_prefix, prefix_order
 
1396
 
 
1397
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1398
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1399
        """For the given keys, group them into 'best-sized' requests.
 
1400
 
 
1401
        The idea is to avoid making 1 request per file, but to never try to
 
1402
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1403
        possible, we should try to group requests to the same pack file
 
1404
        together.
 
1405
 
 
1406
        :return: list of (keys, non_local) tuples that indicate what keys
 
1407
            should be fetched next.
 
1408
        """
 
1409
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1410
        #       from the same pack file together, and we want to extract all
 
1411
        #       the texts for a given build-chain together. Ultimately it
 
1412
        #       probably needs a better global view.
 
1413
        total_keys = len(keys)
 
1414
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1415
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1416
        cur_keys = []
 
1417
        cur_non_local = set()
 
1418
        cur_size = 0
 
1419
        result = []
 
1420
        sizes = []
 
1421
        for prefix in prefix_order:
 
1422
            keys = prefix_split_keys[prefix]
 
1423
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1424
 
 
1425
            this_size = self._index._get_total_build_size(keys, positions)
 
1426
            cur_size += this_size
 
1427
            cur_keys.extend(keys)
 
1428
            cur_non_local.update(non_local)
 
1429
            if cur_size > _min_buffer_size:
 
1430
                result.append((cur_keys, cur_non_local))
 
1431
                sizes.append(cur_size)
 
1432
                cur_keys = []
 
1433
                cur_non_local = set()
 
1434
                cur_size = 0
 
1435
        if cur_keys:
 
1436
            result.append((cur_keys, cur_non_local))
 
1437
            sizes.append(cur_size)
 
1438
        return result
 
1439
 
 
1440
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
1441
        """Get a stream of records for keys.
 
1442
 
 
1443
        :param keys: The keys to include.
 
1444
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
1445
            sorted stream has compression parents strictly before their
 
1446
            children.
 
1447
        :param include_delta_closure: If True then the closure across any
 
1448
            compression parents will be included (in the opaque data).
 
1449
        :return: An iterator of ContentFactory objects, each of which is only
 
1450
            valid until the iterator is advanced.
 
1451
        """
 
1452
        # keys might be a generator
 
1453
        keys = set(keys)
 
1454
        if not keys:
 
1455
            return
 
1456
        if not self._index.has_graph:
 
1457
            # Cannot sort when no graph has been stored.
 
1458
            ordering = 'unordered'
 
1459
 
 
1460
        remaining_keys = keys
 
1461
        while True:
 
1462
            try:
 
1463
                keys = set(remaining_keys)
 
1464
                for content_factory in self._get_remaining_record_stream(keys,
 
1465
                                                                         ordering, include_delta_closure):
 
1466
                    remaining_keys.discard(content_factory.key)
 
1467
                    yield content_factory
 
1468
                return
 
1469
            except errors.RetryWithNewPacks as e:
 
1470
                self._access.reload_or_raise(e)
 
1471
 
 
1472
    def _get_remaining_record_stream(self, keys, ordering,
 
1473
                                     include_delta_closure):
 
1474
        """This function is the 'retry' portion for get_record_stream."""
 
1475
        if include_delta_closure:
 
1476
            positions = self._get_components_positions(
 
1477
                keys, allow_missing=True)
 
1478
        else:
 
1479
            build_details = self._index.get_build_details(keys)
 
1480
            # map from key to
 
1481
            # (record_details, access_memo, compression_parent_key)
 
1482
            positions = dict((key, self._build_details_to_components(details))
 
1483
                             for key, details in viewitems(build_details))
 
1484
        absent_keys = keys.difference(set(positions))
 
1485
        # There may be more absent keys : if we're missing the basis component
 
1486
        # and are trying to include the delta closure.
 
1487
        # XXX: We should not ever need to examine remote sources because we do
 
1488
        # not permit deltas across versioned files boundaries.
 
1489
        if include_delta_closure:
 
1490
            needed_from_fallback = set()
 
1491
            # Build up reconstructable_keys dict.  key:True in this dict means
 
1492
            # the key can be reconstructed.
 
1493
            reconstructable_keys = {}
 
1494
            for key in keys:
 
1495
                # the delta chain
 
1496
                try:
 
1497
                    chain = [key, positions[key][2]]
 
1498
                except KeyError:
 
1499
                    needed_from_fallback.add(key)
 
1500
                    continue
 
1501
                result = True
 
1502
                while chain[-1] is not None:
 
1503
                    if chain[-1] in reconstructable_keys:
 
1504
                        result = reconstructable_keys[chain[-1]]
 
1505
                        break
 
1506
                    else:
 
1507
                        try:
 
1508
                            chain.append(positions[chain[-1]][2])
 
1509
                        except KeyError:
 
1510
                            # missing basis component
 
1511
                            needed_from_fallback.add(chain[-1])
 
1512
                            result = True
 
1513
                            break
 
1514
                for chain_key in chain[:-1]:
 
1515
                    reconstructable_keys[chain_key] = result
 
1516
                if not result:
 
1517
                    needed_from_fallback.add(key)
 
1518
        # Double index lookups here : need a unified api ?
 
1519
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
 
1520
        if ordering in ('topological', 'groupcompress'):
 
1521
            if ordering == 'topological':
 
1522
                # Global topological sort
 
1523
                present_keys = tsort.topo_sort(global_map)
 
1524
            else:
 
1525
                present_keys = sort_groupcompress(global_map)
 
1526
            # Now group by source:
 
1527
            source_keys = []
 
1528
            current_source = None
 
1529
            for key in present_keys:
 
1530
                for parent_map in parent_maps:
 
1531
                    if key in parent_map:
 
1532
                        key_source = parent_map
 
1533
                        break
 
1534
                if current_source is not key_source:
 
1535
                    source_keys.append((key_source, []))
 
1536
                    current_source = key_source
 
1537
                source_keys[-1][1].append(key)
 
1538
        else:
 
1539
            if ordering != 'unordered':
 
1540
                raise AssertionError('valid values for ordering are:'
 
1541
                                     ' "unordered", "groupcompress" or "topological" not: %r'
 
1542
                                     % (ordering,))
 
1543
            # Just group by source; remote sources first.
 
1544
            present_keys = []
 
1545
            source_keys = []
 
1546
            for parent_map in reversed(parent_maps):
 
1547
                source_keys.append((parent_map, []))
 
1548
                for key in parent_map:
 
1549
                    present_keys.append(key)
 
1550
                    source_keys[-1][1].append(key)
 
1551
            # We have been requested to return these records in an order that
 
1552
            # suits us. So we ask the index to give us an optimally sorted
 
1553
            # order.
 
1554
            for source, sub_keys in source_keys:
 
1555
                if source is parent_maps[0]:
 
1556
                    # Only sort the keys for this VF
 
1557
                    self._index._sort_keys_by_io(sub_keys, positions)
 
1558
        absent_keys = keys - set(global_map)
 
1559
        for key in absent_keys:
 
1560
            yield AbsentContentFactory(key)
 
1561
        # restrict our view to the keys we can answer.
 
1562
        # XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
 
1563
        # XXX: At that point we need to consider the impact of double reads by
 
1564
        # utilising components multiple times.
 
1565
        if include_delta_closure:
 
1566
            # XXX: get_content_maps performs its own index queries; allow state
 
1567
            # to be passed in.
 
1568
            non_local_keys = needed_from_fallback - absent_keys
 
1569
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1570
                                                                non_local_keys,
 
1571
                                                                positions):
 
1572
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1573
                                                   global_map,
 
1574
                                                   ordering=ordering)
 
1575
                for record in generator.get_record_stream():
 
1576
                    yield record
 
1577
        else:
 
1578
            for source, keys in source_keys:
 
1579
                if source is parent_maps[0]:
 
1580
                    # this KnitVersionedFiles
 
1581
                    records = [(key, positions[key][1]) for key in keys]
 
1582
                    for key, raw_data in self._read_records_iter_unchecked(records):
 
1583
                        (record_details, index_memo, _) = positions[key]
 
1584
                        yield KnitContentFactory(key, global_map[key],
 
1585
                                                 record_details, None, raw_data, self._factory.annotated, None)
 
1586
                else:
 
1587
                    vf = self._immediate_fallback_vfs[parent_maps.index(
 
1588
                        source) - 1]
 
1589
                    for record in vf.get_record_stream(keys, ordering,
 
1590
                                                       include_delta_closure):
 
1591
                        yield record
 
1592
 
 
1593
    def get_sha1s(self, keys):
 
1594
        """See VersionedFiles.get_sha1s()."""
 
1595
        missing = set(keys)
 
1596
        record_map = self._get_record_map(missing, allow_missing=True)
 
1597
        result = {}
 
1598
        for key, details in viewitems(record_map):
 
1599
            if key not in missing:
 
1600
                continue
 
1601
            # record entry 2 is the 'digest'.
 
1602
            result[key] = details[2]
 
1603
        missing.difference_update(set(result))
 
1604
        for source in self._immediate_fallback_vfs:
 
1605
            if not missing:
 
1606
                break
 
1607
            new_result = source.get_sha1s(missing)
 
1608
            result.update(new_result)
 
1609
            missing.difference_update(set(new_result))
 
1610
        return result
 
1611
 
 
1612
    def insert_record_stream(self, stream):
 
1613
        """Insert a record stream into this container.
 
1614
 
 
1615
        :param stream: A stream of records to insert.
 
1616
        :return: None
 
1617
        :seealso VersionedFiles.get_record_stream:
 
1618
        """
 
1619
        def get_adapter(adapter_key):
 
1620
            try:
 
1621
                return adapters[adapter_key]
 
1622
            except KeyError:
 
1623
                adapter_factory = adapter_registry.get(adapter_key)
 
1624
                adapter = adapter_factory(self)
 
1625
                adapters[adapter_key] = adapter
 
1626
                return adapter
 
1627
        delta_types = set()
 
1628
        if self._factory.annotated:
 
1629
            # self is annotated, we need annotated knits to use directly.
 
1630
            annotated = "annotated-"
 
1631
            convertibles = []
 
1632
        else:
 
1633
            # self is not annotated, but we can strip annotations cheaply.
 
1634
            annotated = ""
 
1635
            convertibles = {"knit-annotated-ft-gz"}
 
1636
            if self._max_delta_chain:
 
1637
                delta_types.add("knit-annotated-delta-gz")
 
1638
                convertibles.add("knit-annotated-delta-gz")
 
1639
        # The set of types we can cheaply adapt without needing basis texts.
 
1640
        native_types = set()
 
1641
        if self._max_delta_chain:
 
1642
            native_types.add("knit-%sdelta-gz" % annotated)
 
1643
            delta_types.add("knit-%sdelta-gz" % annotated)
 
1644
        native_types.add("knit-%sft-gz" % annotated)
 
1645
        knit_types = native_types.union(convertibles)
 
1646
        adapters = {}
 
1647
        # Buffer all index entries that we can't add immediately because their
 
1648
        # basis parent is missing. We don't buffer all because generating
 
1649
        # annotations may require access to some of the new records. However we
 
1650
        # can't generate annotations from new deltas until their basis parent
 
1651
        # is present anyway, so we get away with not needing an index that
 
1652
        # includes the new keys.
 
1653
        #
 
1654
        # See <http://launchpad.net/bugs/300177> about ordering of compression
 
1655
        # parents in the records - to be conservative, we insist that all
 
1656
        # parents must be present to avoid expanding to a fulltext.
 
1657
        #
 
1658
        # key = basis_parent, value = index entry to add
 
1659
        buffered_index_entries = {}
 
1660
        for record in stream:
 
1661
            kind = record.storage_kind
 
1662
            if kind.startswith('knit-') and kind.endswith('-gz'):
 
1663
                # Check that the ID in the header of the raw knit bytes matches
 
1664
                # the record metadata.
 
1665
                raw_data = record._raw_record
 
1666
                df, rec = self._parse_record_header(record.key, raw_data)
 
1667
                df.close()
 
1668
            buffered = False
 
1669
            parents = record.parents
 
1670
            if record.storage_kind in delta_types:
 
1671
                # TODO: eventually the record itself should track
 
1672
                #       compression_parent
 
1673
                compression_parent = parents[0]
 
1674
            else:
 
1675
                compression_parent = None
 
1676
            # Raise an error when a record is missing.
 
1677
            if record.storage_kind == 'absent':
 
1678
                raise RevisionNotPresent([record.key], self)
 
1679
            elif ((record.storage_kind in knit_types) and
 
1680
                  (compression_parent is None or
 
1681
                   not self._immediate_fallback_vfs or
 
1682
                   compression_parent in self._index or
 
1683
                   compression_parent not in self)):
 
1684
                # we can insert the knit record literally if either it has no
 
1685
                # compression parent OR we already have its basis in this kvf
 
1686
                # OR the basis is not present even in the fallbacks.  In the
 
1687
                # last case it will either turn up later in the stream and all
 
1688
                # will be well, or it won't turn up at all and we'll raise an
 
1689
                # error at the end.
 
1690
                #
 
1691
                # TODO: self.__contains__ is somewhat redundant with
 
1692
                # self._index.__contains__; we really want something that directly
 
1693
                # asks if it's only present in the fallbacks. -- mbp 20081119
 
1694
                if record.storage_kind not in native_types:
 
1695
                    try:
 
1696
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1697
                        adapter = get_adapter(adapter_key)
 
1698
                    except KeyError:
 
1699
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1700
                        adapter = get_adapter(adapter_key)
 
1701
                    bytes = adapter.get_bytes(record)
 
1702
                else:
 
1703
                    # It's a knit record, it has a _raw_record field (even if
 
1704
                    # it was reconstituted from a network stream).
 
1705
                    bytes = record._raw_record
 
1706
                options = [record._build_details[0].encode('ascii')]
 
1707
                if record._build_details[1]:
 
1708
                    options.append(b'no-eol')
 
1709
                # Just blat it across.
 
1710
                # Note: This does end up adding data on duplicate keys. As
 
1711
                # modern repositories use atomic insertions this should not
 
1712
                # lead to excessive growth in the event of interrupted fetches.
 
1713
                # 'knit' repositories may suffer excessive growth, but as a
 
1714
                # deprecated format this is tolerable. It can be fixed if
 
1715
                # needed by in the kndx index support raising on a duplicate
 
1716
                # add with identical parents and options.
 
1717
                access_memo = self._access.add_raw_records(
 
1718
                    [(record.key, len(bytes))], bytes)[0]
 
1719
                index_entry = (record.key, options, access_memo, parents)
 
1720
                if b'fulltext' not in options:
 
1721
                    # Not a fulltext, so we need to make sure the compression
 
1722
                    # parent will also be present.
 
1723
                    # Note that pack backed knits don't need to buffer here
 
1724
                    # because they buffer all writes to the transaction level,
 
1725
                    # but we don't expose that difference at the index level. If
 
1726
                    # the query here has sufficient cost to show up in
 
1727
                    # profiling we should do that.
 
1728
                    #
 
1729
                    # They're required to be physically in this
 
1730
                    # KnitVersionedFiles, not in a fallback.
 
1731
                    if compression_parent not in self._index:
 
1732
                        pending = buffered_index_entries.setdefault(
 
1733
                            compression_parent, [])
 
1734
                        pending.append(index_entry)
 
1735
                        buffered = True
 
1736
                if not buffered:
 
1737
                    self._index.add_records([index_entry])
 
1738
            elif record.storage_kind == 'chunked':
 
1739
                self.add_lines(record.key, parents,
 
1740
                               osutils.chunks_to_lines(record.get_bytes_as('chunked')))
 
1741
            else:
 
1742
                # Not suitable for direct insertion as a
 
1743
                # delta, either because it's not the right format, or this
 
1744
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
 
1745
                # 0) or because it depends on a base only present in the
 
1746
                # fallback kvfs.
 
1747
                self._access.flush()
 
1748
                try:
 
1749
                    # Try getting a fulltext directly from the record.
 
1750
                    bytes = record.get_bytes_as('fulltext')
 
1751
                except errors.UnavailableRepresentation:
 
1752
                    adapter_key = record.storage_kind, 'fulltext'
 
1753
                    adapter = get_adapter(adapter_key)
 
1754
                    bytes = adapter.get_bytes(record)
 
1755
                lines = split_lines(bytes)
 
1756
                try:
 
1757
                    self.add_lines(record.key, parents, lines)
 
1758
                except errors.RevisionAlreadyPresent:
 
1759
                    pass
 
1760
            # Add any records whose basis parent is now available.
 
1761
            if not buffered:
 
1762
                added_keys = [record.key]
 
1763
                while added_keys:
 
1764
                    key = added_keys.pop(0)
 
1765
                    if key in buffered_index_entries:
 
1766
                        index_entries = buffered_index_entries[key]
 
1767
                        self._index.add_records(index_entries)
 
1768
                        added_keys.extend(
 
1769
                            [index_entry[0] for index_entry in index_entries])
 
1770
                        del buffered_index_entries[key]
 
1771
        if buffered_index_entries:
 
1772
            # There were index entries buffered at the end of the stream,
 
1773
            # So these need to be added (if the index supports holding such
 
1774
            # entries for later insertion)
 
1775
            all_entries = []
 
1776
            for key in buffered_index_entries:
 
1777
                index_entries = buffered_index_entries[key]
 
1778
                all_entries.extend(index_entries)
 
1779
            self._index.add_records(
 
1780
                all_entries, missing_compression_parents=True)
 
1781
 
 
1782
    def get_missing_compression_parent_keys(self):
 
1783
        """Return an iterable of keys of missing compression parents.
 
1784
 
 
1785
        Check this after calling insert_record_stream to find out if there are
 
1786
        any missing compression parents.  If there are, the records that
 
1787
        depend on them are not able to be inserted safely. For atomic
 
1788
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1789
        suspended - commit will fail at this point. Nonatomic knits will error
 
1790
        earlier because they have no staging area to put pending entries into.
 
1791
        """
 
1792
        return self._index.get_missing_compression_parents()
 
1793
 
 
1794
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
1795
        """Iterate over the lines in the versioned files from keys.
 
1796
 
 
1797
        This may return lines from other keys. Each item the returned
 
1798
        iterator yields is a tuple of a line and a text version that that line
 
1799
        is present in (not introduced in).
 
1800
 
 
1801
        Ordering of results is in whatever order is most suitable for the
 
1802
        underlying storage format.
 
1803
 
 
1804
        If a progress bar is supplied, it may be used to indicate progress.
 
1805
        The caller is responsible for cleaning up progress bars (because this
 
1806
        is an iterator).
 
1807
 
 
1808
        NOTES:
 
1809
         * Lines are normalised by the underlying store: they will all have \\n
 
1810
           terminators.
 
1811
         * Lines are returned in arbitrary order.
 
1812
         * If a requested key did not change any lines (or didn't have any
 
1813
           lines), it may not be mentioned at all in the result.
 
1814
 
 
1815
        :param pb: Progress bar supplied by caller.
 
1816
        :return: An iterator over (line, key).
 
1817
        """
 
1818
        if pb is None:
 
1819
            pb = ui.ui_factory.nested_progress_bar()
 
1820
        keys = set(keys)
 
1821
        total = len(keys)
 
1822
        done = False
 
1823
        while not done:
 
1824
            try:
 
1825
                # we don't care about inclusions, the caller cares.
 
1826
                # but we need to setup a list of records to visit.
 
1827
                # we need key, position, length
 
1828
                key_records = []
 
1829
                build_details = self._index.get_build_details(keys)
 
1830
                for key, details in viewitems(build_details):
 
1831
                    if key in keys:
 
1832
                        key_records.append((key, details[0]))
 
1833
                records_iter = enumerate(self._read_records_iter(key_records))
 
1834
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1835
                    pb.update(gettext('Walking content'), key_idx, total)
 
1836
                    compression_parent = build_details[key][1]
 
1837
                    if compression_parent is None:
 
1838
                        # fulltext
 
1839
                        line_iterator = self._factory.get_fulltext_content(
 
1840
                            data)
 
1841
                    else:
 
1842
                        # Delta
 
1843
                        line_iterator = self._factory.get_linedelta_content(
 
1844
                            data)
 
1845
                    # Now that we are yielding the data for this key, remove it
 
1846
                    # from the list
 
1847
                    keys.remove(key)
 
1848
                    # XXX: It might be more efficient to yield (key,
 
1849
                    # line_iterator) in the future. However for now, this is a
 
1850
                    # simpler change to integrate into the rest of the
 
1851
                    # codebase. RBC 20071110
 
1852
                    for line in line_iterator:
 
1853
                        yield line, key
 
1854
                done = True
 
1855
            except errors.RetryWithNewPacks as e:
 
1856
                self._access.reload_or_raise(e)
 
1857
        # If there are still keys we've not yet found, we look in the fallback
 
1858
        # vfs, and hope to find them there.  Note that if the keys are found
 
1859
        # but had no changes or no content, the fallback may not return
 
1860
        # anything.
 
1861
        if keys and not self._immediate_fallback_vfs:
 
1862
            # XXX: strictly the second parameter is meant to be the file id
 
1863
            # but it's not easily accessible here.
 
1864
            raise RevisionNotPresent(keys, repr(self))
 
1865
        for source in self._immediate_fallback_vfs:
 
1866
            if not keys:
 
1867
                break
 
1868
            source_keys = set()
 
1869
            for line, key in source.iter_lines_added_or_present_in_keys(keys):
 
1870
                source_keys.add(key)
 
1871
                yield line, key
 
1872
            keys.difference_update(source_keys)
 
1873
        pb.update(gettext('Walking content'), total, total)
 
1874
 
 
1875
    def _make_line_delta(self, delta_seq, new_content):
 
1876
        """Generate a line delta from delta_seq and new_content."""
 
1877
        diff_hunks = []
 
1878
        for op in delta_seq.get_opcodes():
 
1879
            if op[0] == 'equal':
 
1880
                continue
 
1881
            diff_hunks.append(
 
1882
                (op[1], op[2], op[4] - op[3], new_content._lines[op[3]:op[4]]))
 
1883
        return diff_hunks
 
1884
 
 
1885
    def _merge_annotations(self, content, parents, parent_texts={},
 
1886
                           delta=None, annotated=None,
 
1887
                           left_matching_blocks=None):
 
1888
        """Merge annotations for content and generate deltas.
 
1889
 
 
1890
        This is done by comparing the annotations based on changes to the text
 
1891
        and generating a delta on the resulting full texts. If annotations are
 
1892
        not being created then a simple delta is created.
 
1893
        """
 
1894
        if left_matching_blocks is not None:
 
1895
            delta_seq = diff._PrematchedMatcher(left_matching_blocks)
 
1896
        else:
 
1897
            delta_seq = None
 
1898
        if annotated:
 
1899
            for parent_key in parents:
 
1900
                merge_content = self._get_content(parent_key, parent_texts)
 
1901
                if (parent_key == parents[0] and delta_seq is not None):
 
1902
                    seq = delta_seq
 
1903
                else:
 
1904
                    seq = patiencediff.PatienceSequenceMatcher(
 
1905
                        None, merge_content.text(), content.text())
 
1906
                for i, j, n in seq.get_matching_blocks():
 
1907
                    if n == 0:
 
1908
                        continue
 
1909
                    # this copies (origin, text) pairs across to the new
 
1910
                    # content for any line that matches the last-checked
 
1911
                    # parent.
 
1912
                    content._lines[j:j + n] = merge_content._lines[i:i + n]
 
1913
            # XXX: Robert says the following block is a workaround for a
 
1914
            # now-fixed bug and it can probably be deleted. -- mbp 20080618
 
1915
            if content._lines and not content._lines[-1][1].endswith(b'\n'):
 
1916
                # The copied annotation was from a line without a trailing EOL,
 
1917
                # reinstate one for the content object, to ensure correct
 
1918
                # serialization.
 
1919
                line = content._lines[-1][1] + b'\n'
 
1920
                content._lines[-1] = (content._lines[-1][0], line)
 
1921
        if delta:
 
1922
            if delta_seq is None:
 
1923
                reference_content = self._get_content(parents[0], parent_texts)
 
1924
                new_texts = content.text()
 
1925
                old_texts = reference_content.text()
 
1926
                delta_seq = patiencediff.PatienceSequenceMatcher(
 
1927
                    None, old_texts, new_texts)
 
1928
            return self._make_line_delta(delta_seq, content)
 
1929
 
 
1930
    def _parse_record(self, version_id, data):
 
1931
        """Parse an original format knit record.
 
1932
 
 
1933
        These have the last element of the key only present in the stored data.
 
1934
        """
 
1935
        rec, record_contents = self._parse_record_unchecked(data)
 
1936
        self._check_header_version(rec, version_id)
 
1937
        return record_contents, rec[3]
 
1938
 
 
1939
    def _parse_record_header(self, key, raw_data):
 
1940
        """Parse a record header for consistency.
 
1941
 
 
1942
        :return: the header and the decompressor stream.
 
1943
                 as (stream, header_record)
 
1944
        """
 
1945
        df = gzip.GzipFile(mode='rb', fileobj=BytesIO(raw_data))
 
1946
        try:
 
1947
            # Current serialise
 
1948
            rec = self._check_header(key, df.readline())
 
1949
        except Exception as e:
 
1950
            raise KnitCorrupt(self,
 
1951
                              "While reading {%s} got %s(%s)"
 
1952
                              % (key, e.__class__.__name__, str(e)))
 
1953
        return df, rec
 
1954
 
 
1955
    def _parse_record_unchecked(self, data):
 
1956
        # profiling notes:
 
1957
        # 4168 calls in 2880 217 internal
 
1958
        # 4168 calls to _parse_record_header in 2121
 
1959
        # 4168 calls to readlines in 330
 
1960
        with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
 
1961
            try:
 
1962
                record_contents = df.readlines()
 
1963
            except Exception as e:
 
1964
                raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
 
1965
                                  (data, e.__class__.__name__, str(e)))
 
1966
            header = record_contents.pop(0)
 
1967
            rec = self._split_header(header)
 
1968
            last_line = record_contents.pop()
 
1969
            if len(record_contents) != int(rec[2]):
 
1970
                raise KnitCorrupt(self,
 
1971
                                  'incorrect number of lines %s != %s'
 
1972
                                  ' for version {%s} %s'
 
1973
                                  % (len(record_contents), int(rec[2]),
 
1974
                                     rec[1], record_contents))
 
1975
            if last_line != b'end %s\n' % rec[1]:
 
1976
                raise KnitCorrupt(self,
 
1977
                                  'unexpected version end line %r, wanted %r'
 
1978
                                  % (last_line, rec[1]))
 
1979
        return rec, record_contents
 
1980
 
 
1981
    def _read_records_iter(self, records):
 
1982
        """Read text records from data file and yield result.
 
1983
 
 
1984
        The result will be returned in whatever is the fastest to read.
 
1985
        Not by the order requested. Also, multiple requests for the same
 
1986
        record will only yield 1 response.
 
1987
 
 
1988
        :param records: A list of (key, access_memo) entries
 
1989
        :return: Yields (key, contents, digest) in the order
 
1990
                 read, not the order requested
 
1991
        """
 
1992
        if not records:
 
1993
            return
 
1994
 
 
1995
        # XXX: This smells wrong, IO may not be getting ordered right.
 
1996
        needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1997
        if not needed_records:
 
1998
            return
 
1999
 
 
2000
        # The transport optimizes the fetching as well
 
2001
        # (ie, reads continuous ranges.)
 
2002
        raw_data = self._access.get_raw_records(
 
2003
            [index_memo for key, index_memo in needed_records])
 
2004
 
 
2005
        for (key, index_memo), data in zip(needed_records, raw_data):
 
2006
            content, digest = self._parse_record(key[-1], data)
 
2007
            yield key, content, digest
 
2008
 
 
2009
    def _read_records_iter_raw(self, records):
 
2010
        """Read text records from data file and yield raw data.
 
2011
 
 
2012
        This unpacks enough of the text record to validate the id is
 
2013
        as expected but thats all.
 
2014
 
 
2015
        Each item the iterator yields is (key, bytes,
 
2016
            expected_sha1_of_full_text).
 
2017
        """
 
2018
        for key, data in self._read_records_iter_unchecked(records):
 
2019
            # validate the header (note that we can only use the suffix in
 
2020
            # current knit records).
 
2021
            df, rec = self._parse_record_header(key, data)
 
2022
            df.close()
 
2023
            yield key, data, rec[3]
 
2024
 
 
2025
    def _read_records_iter_unchecked(self, records):
 
2026
        """Read text records from data file and yield raw data.
 
2027
 
 
2028
        No validation is done.
 
2029
 
 
2030
        Yields tuples of (key, data).
 
2031
        """
 
2032
        # setup an iterator of the external records:
 
2033
        # uses readv so nice and fast we hope.
 
2034
        if len(records):
 
2035
            # grab the disk data needed.
 
2036
            needed_offsets = [index_memo for key, index_memo
 
2037
                              in records]
 
2038
            raw_records = self._access.get_raw_records(needed_offsets)
 
2039
 
 
2040
        for key, index_memo in records:
 
2041
            data = next(raw_records)
 
2042
            yield key, data
 
2043
 
 
2044
    def _record_to_data(self, key, digest, lines, dense_lines=None):
 
2045
        """Convert key, digest, lines into a raw data block.
 
2046
 
 
2047
        :param key: The key of the record. Currently keys are always serialised
 
2048
            using just the trailing component.
 
2049
        :param dense_lines: The bytes of lines but in a denser form. For
 
2050
            instance, if lines is a list of 1000 bytestrings each ending in
 
2051
            \\n, dense_lines may be a list with one line in it, containing all
 
2052
            the 1000's lines and their \\n's. Using dense_lines if it is
 
2053
            already known is a win because the string join to create bytes in
 
2054
            this function spends less time resizing the final string.
 
2055
        :return: (len, a BytesIO instance with the raw data ready to read.)
 
2056
        """
 
2057
        chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
 
2058
        chunks.extend(dense_lines or lines)
 
2059
        chunks.append(b"end " + key[-1] + b"\n")
 
2060
        for chunk in chunks:
 
2061
            if not isinstance(chunk, bytes):
 
2062
                raise AssertionError(
 
2063
                    'data must be plain bytes was %s' % type(chunk))
 
2064
        if lines and not lines[-1].endswith(b'\n'):
 
2065
            raise ValueError('corrupt lines value %r' % lines)
 
2066
        compressed_bytes = b''.join(tuned_gzip.chunks_to_gzip(chunks))
 
2067
        return len(compressed_bytes), compressed_bytes
 
2068
 
 
2069
    def _split_header(self, line):
 
2070
        rec = line.split()
 
2071
        if len(rec) != 4:
 
2072
            raise KnitCorrupt(self,
 
2073
                              'unexpected number of elements in record header')
 
2074
        return rec
 
2075
 
 
2076
    def keys(self):
 
2077
        """See VersionedFiles.keys."""
 
2078
        if 'evil' in debug.debug_flags:
 
2079
            trace.mutter_callsite(2, "keys scales with size of history")
 
2080
        sources = [self._index] + self._immediate_fallback_vfs
 
2081
        result = set()
 
2082
        for source in sources:
 
2083
            result.update(source.keys())
 
2084
        return result
 
2085
 
 
2086
 
 
2087
class _ContentMapGenerator(object):
 
2088
    """Generate texts or expose raw deltas for a set of texts."""
 
2089
 
 
2090
    def __init__(self, ordering='unordered'):
 
2091
        self._ordering = ordering
 
2092
 
 
2093
    def _get_content(self, key):
 
2094
        """Get the content object for key."""
 
2095
        # Note that _get_content is only called when the _ContentMapGenerator
 
2096
        # has been constructed with just one key requested for reconstruction.
 
2097
        if key in self.nonlocal_keys:
 
2098
            record = next(self.get_record_stream())
 
2099
            # Create a content object on the fly
 
2100
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
2101
            return PlainKnitContent(lines, record.key)
 
2102
        else:
 
2103
            # local keys we can ask for directly
 
2104
            return self._get_one_work(key)
 
2105
 
 
2106
    def get_record_stream(self):
 
2107
        """Get a record stream for the keys requested during __init__."""
 
2108
        for record in self._work():
 
2109
            yield record
 
2110
 
 
2111
    def _work(self):
 
2112
        """Produce maps of text and KnitContents as dicts.
 
2113
 
 
2114
        :return: (text_map, content_map) where text_map contains the texts for
 
2115
            the requested versions and content_map contains the KnitContents.
 
2116
        """
 
2117
        # NB: By definition we never need to read remote sources unless texts
 
2118
        # are requested from them: we don't delta across stores - and we
 
2119
        # explicitly do not want to to prevent data loss situations.
 
2120
        if self.global_map is None:
 
2121
            self.global_map = self.vf.get_parent_map(self.keys)
 
2122
        nonlocal_keys = self.nonlocal_keys
 
2123
 
 
2124
        missing_keys = set(nonlocal_keys)
 
2125
        # Read from remote versioned file instances and provide to our caller.
 
2126
        for source in self.vf._immediate_fallback_vfs:
 
2127
            if not missing_keys:
 
2128
                break
 
2129
            # Loop over fallback repositories asking them for texts - ignore
 
2130
            # any missing from a particular fallback.
 
2131
            for record in source.get_record_stream(missing_keys,
 
2132
                                                   self._ordering, True):
 
2133
                if record.storage_kind == 'absent':
 
2134
                    # Not in thie particular stream, may be in one of the
 
2135
                    # other fallback vfs objects.
 
2136
                    continue
 
2137
                missing_keys.remove(record.key)
 
2138
                yield record
 
2139
 
 
2140
        if self._raw_record_map is None:
 
2141
            raise AssertionError('_raw_record_map should have been filled')
 
2142
        first = True
 
2143
        for key in self.keys:
 
2144
            if key in self.nonlocal_keys:
 
2145
                continue
 
2146
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2147
            first = False
 
2148
 
 
2149
    def _get_one_work(self, requested_key):
 
2150
        # Now, if we have calculated everything already, just return the
 
2151
        # desired text.
 
2152
        if requested_key in self._contents_map:
 
2153
            return self._contents_map[requested_key]
 
2154
        # To simplify things, parse everything at once - code that wants one text
 
2155
        # probably wants them all.
 
2156
        # FUTURE: This function could be improved for the 'extract many' case
 
2157
        # by tracking each component and only doing the copy when the number of
 
2158
        # children than need to apply delta's to it is > 1 or it is part of the
 
2159
        # final output.
 
2160
        multiple_versions = len(self.keys) != 1
 
2161
        if self._record_map is None:
 
2162
            self._record_map = self.vf._raw_map_to_record_map(
 
2163
                self._raw_record_map)
 
2164
        record_map = self._record_map
 
2165
        # raw_record_map is key:
 
2166
        # Have read and parsed records at this point.
 
2167
        for key in self.keys:
 
2168
            if key in self.nonlocal_keys:
 
2169
                # already handled
 
2170
                continue
 
2171
            components = []
 
2172
            cursor = key
 
2173
            while cursor is not None:
 
2174
                try:
 
2175
                    record, record_details, digest, next = record_map[cursor]
 
2176
                except KeyError:
 
2177
                    raise RevisionNotPresent(cursor, self)
 
2178
                components.append((cursor, record, record_details, digest))
 
2179
                cursor = next
 
2180
                if cursor in self._contents_map:
 
2181
                    # no need to plan further back
 
2182
                    components.append((cursor, None, None, None))
 
2183
                    break
 
2184
 
 
2185
            content = None
 
2186
            for (component_id, record, record_details,
 
2187
                 digest) in reversed(components):
 
2188
                if component_id in self._contents_map:
 
2189
                    content = self._contents_map[component_id]
 
2190
                else:
 
2191
                    content, delta = self._factory.parse_record(key[-1],
 
2192
                                                                record, record_details, content,
 
2193
                                                                copy_base_content=multiple_versions)
 
2194
                    if multiple_versions:
 
2195
                        self._contents_map[component_id] = content
 
2196
 
 
2197
            # digest here is the digest from the last applied component.
 
2198
            text = content.text()
 
2199
            actual_sha = sha_strings(text)
 
2200
            if actual_sha != digest:
 
2201
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2202
        if multiple_versions:
 
2203
            return self._contents_map[requested_key]
 
2204
        else:
 
2205
            return content
 
2206
 
 
2207
    def _wire_bytes(self):
 
2208
        """Get the bytes to put on the wire for 'key'.
 
2209
 
 
2210
        The first collection of bytes asked for returns the serialised
 
2211
        raw_record_map and the additional details (key, parent) for key.
 
2212
        Subsequent calls return just the additional details (key, parent).
 
2213
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2214
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2215
 
 
2216
        :param key: A key from the content generator.
 
2217
        :return: Bytes to put on the wire.
 
2218
        """
 
2219
        lines = []
 
2220
        # kind marker for dispatch on the far side,
 
2221
        lines.append(b'knit-delta-closure')
 
2222
        # Annotated or not
 
2223
        if self.vf._factory.annotated:
 
2224
            lines.append(b'annotated')
 
2225
        else:
 
2226
            lines.append(b'')
 
2227
        # then the list of keys
 
2228
        lines.append(b'\t'.join(b'\x00'.join(key) for key in self.keys
 
2229
                                if key not in self.nonlocal_keys))
 
2230
        # then the _raw_record_map in serialised form:
 
2231
        map_byte_list = []
 
2232
        # for each item in the map:
 
2233
        # 1 line with key
 
2234
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2235
        # one line with method
 
2236
        # one line with noeol
 
2237
        # one line with next ('' for None)
 
2238
        # one line with byte count of the record bytes
 
2239
        # the record bytes
 
2240
        for key, (record_bytes, (method, noeol), next) in viewitems(
 
2241
                self._raw_record_map):
 
2242
            key_bytes = b'\x00'.join(key)
 
2243
            parents = self.global_map.get(key, None)
 
2244
            if parents is None:
 
2245
                parent_bytes = b'None:'
 
2246
            else:
 
2247
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
 
2248
            method_bytes = method.encode('ascii')
 
2249
            if noeol:
 
2250
                noeol_bytes = b"T"
 
2251
            else:
 
2252
                noeol_bytes = b"F"
 
2253
            if next:
 
2254
                next_bytes = b'\x00'.join(next)
 
2255
            else:
 
2256
                next_bytes = b''
 
2257
            map_byte_list.append(b'\n'.join(
 
2258
                [key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2259
                 b'%d' % len(record_bytes), record_bytes]))
 
2260
        map_bytes = b''.join(map_byte_list)
 
2261
        lines.append(map_bytes)
 
2262
        bytes = b'\n'.join(lines)
 
2263
        return bytes
 
2264
 
 
2265
 
 
2266
class _VFContentMapGenerator(_ContentMapGenerator):
 
2267
    """Content map generator reading from a VersionedFiles object."""
 
2268
 
 
2269
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2270
                 global_map=None, raw_record_map=None, ordering='unordered'):
 
2271
        """Create a _ContentMapGenerator.
 
2272
 
 
2273
        :param versioned_files: The versioned files that the texts are being
 
2274
            extracted from.
 
2275
        :param keys: The keys to produce content maps for.
 
2276
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2277
            which are known to not be in this knit, but rather in one of the
 
2278
            fallback knits.
 
2279
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2280
            This is required if get_record_stream() is to be used.
 
2281
        :param raw_record_map: A unparsed raw record map to use for answering
 
2282
            contents.
 
2283
        """
 
2284
        _ContentMapGenerator.__init__(self, ordering=ordering)
 
2285
        # The vf to source data from
 
2286
        self.vf = versioned_files
 
2287
        # The keys desired
 
2288
        self.keys = list(keys)
 
2289
        # Keys known to be in fallback vfs objects
 
2290
        if nonlocal_keys is None:
 
2291
            self.nonlocal_keys = set()
 
2292
        else:
 
2293
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2294
        # Parents data for keys to be returned in get_record_stream
 
2295
        self.global_map = global_map
 
2296
        # The chunked lists for self.keys in text form
 
2297
        self._text_map = {}
 
2298
        # A cache of KnitContent objects used in extracting texts.
 
2299
        self._contents_map = {}
 
2300
        # All the knit records needed to assemble the requested keys as full
 
2301
        # texts.
 
2302
        self._record_map = None
 
2303
        if raw_record_map is None:
 
2304
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2305
                                                                    allow_missing=True)
 
2306
        else:
 
2307
            self._raw_record_map = raw_record_map
 
2308
        # the factory for parsing records
 
2309
        self._factory = self.vf._factory
 
2310
 
 
2311
 
 
2312
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2313
    """Content map generator sourced from a network stream."""
 
2314
 
 
2315
    def __init__(self, bytes, line_end):
 
2316
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2317
        self._bytes = bytes
 
2318
        self.global_map = {}
 
2319
        self._raw_record_map = {}
 
2320
        self._contents_map = {}
 
2321
        self._record_map = None
 
2322
        self.nonlocal_keys = []
 
2323
        # Get access to record parsing facilities
 
2324
        self.vf = KnitVersionedFiles(None, None)
 
2325
        start = line_end
 
2326
        # Annotated or not
 
2327
        line_end = bytes.find(b'\n', start)
 
2328
        line = bytes[start:line_end]
 
2329
        start = line_end + 1
 
2330
        if line == b'annotated':
 
2331
            self._factory = KnitAnnotateFactory()
 
2332
        else:
 
2333
            self._factory = KnitPlainFactory()
 
2334
        # list of keys to emit in get_record_stream
 
2335
        line_end = bytes.find(b'\n', start)
 
2336
        line = bytes[start:line_end]
 
2337
        start = line_end + 1
 
2338
        self.keys = [
 
2339
            tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
 
2340
            if segment]
 
2341
        # now a loop until the end. XXX: It would be nice if this was just a
 
2342
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2343
        # there is a decent sized gap stopping that at the moment.
 
2344
        end = len(bytes)
 
2345
        while start < end:
 
2346
            # 1 line with key
 
2347
            line_end = bytes.find(b'\n', start)
 
2348
            key = tuple(bytes[start:line_end].split(b'\x00'))
 
2349
            start = line_end + 1
 
2350
            # 1 line with parents (None: for None, '' for ())
 
2351
            line_end = bytes.find(b'\n', start)
 
2352
            line = bytes[start:line_end]
 
2353
            if line == b'None:':
 
2354
                parents = None
 
2355
            else:
 
2356
                parents = tuple(
 
2357
                    tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
 
2358
                    if segment)
 
2359
            self.global_map[key] = parents
 
2360
            start = line_end + 1
 
2361
            # one line with method
 
2362
            line_end = bytes.find(b'\n', start)
 
2363
            line = bytes[start:line_end]
 
2364
            method = line.decode('ascii')
 
2365
            start = line_end + 1
 
2366
            # one line with noeol
 
2367
            line_end = bytes.find(b'\n', start)
 
2368
            line = bytes[start:line_end]
 
2369
            noeol = line == b"T"
 
2370
            start = line_end + 1
 
2371
            # one line with next (b'' for None)
 
2372
            line_end = bytes.find(b'\n', start)
 
2373
            line = bytes[start:line_end]
 
2374
            if not line:
 
2375
                next = None
 
2376
            else:
 
2377
                next = tuple(bytes[start:line_end].split(b'\x00'))
 
2378
            start = line_end + 1
 
2379
            # one line with byte count of the record bytes
 
2380
            line_end = bytes.find(b'\n', start)
 
2381
            line = bytes[start:line_end]
 
2382
            count = int(line)
 
2383
            start = line_end + 1
 
2384
            # the record bytes
 
2385
            record_bytes = bytes[start:start + count]
 
2386
            start = start + count
 
2387
            # put it in the map
 
2388
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2389
 
 
2390
    def get_record_stream(self):
 
2391
        """Get a record stream for for keys requested by the bytestream."""
 
2392
        first = True
 
2393
        for key in self.keys:
 
2394
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2395
            first = False
 
2396
 
 
2397
    def _wire_bytes(self):
 
2398
        return self._bytes
 
2399
 
 
2400
 
 
2401
class _KndxIndex(object):
 
2402
    """Manages knit index files
 
2403
 
 
2404
    The index is kept in memory and read on startup, to enable
 
2405
    fast lookups of revision information.  The cursor of the index
 
2406
    file is always pointing to the end, making it easy to append
 
2407
    entries.
 
2408
 
 
2409
    _cache is a cache for fast mapping from version id to a Index
 
2410
    object.
 
2411
 
 
2412
    _history is a cache for fast mapping from indexes to version ids.
 
2413
 
 
2414
    The index data format is dictionary compressed when it comes to
 
2415
    parent references; a index entry may only have parents that with a
 
2416
    lover index number.  As a result, the index is topological sorted.
 
2417
 
 
2418
    Duplicate entries may be written to the index for a single version id
 
2419
    if this is done then the latter one completely replaces the former:
 
2420
    this allows updates to correct version and parent information.
 
2421
    Note that the two entries may share the delta, and that successive
 
2422
    annotations and references MUST point to the first entry.
 
2423
 
 
2424
    The index file on disc contains a header, followed by one line per knit
 
2425
    record. The same revision can be present in an index file more than once.
 
2426
    The first occurrence gets assigned a sequence number starting from 0.
 
2427
 
 
2428
    The format of a single line is
 
2429
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
 
2430
    REVISION_ID is a utf8-encoded revision id
 
2431
    FLAGS is a comma separated list of flags about the record. Values include
 
2432
        no-eol, line-delta, fulltext.
 
2433
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
 
2434
        that the compressed data starts at.
 
2435
    LENGTH is the ascii representation of the length of the data file.
 
2436
    PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
 
2437
        REVISION_ID.
 
2438
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
 
2439
        revision id already in the knit that is a parent of REVISION_ID.
 
2440
    The ' :' marker is the end of record marker.
 
2441
 
 
2442
    partial writes:
 
2443
    when a write is interrupted to the index file, it will result in a line
 
2444
    that does not end in ' :'. If the ' :' is not present at the end of a line,
 
2445
    or at the end of the file, then the record that is missing it will be
 
2446
    ignored by the parser.
 
2447
 
 
2448
    When writing new records to the index file, the data is preceded by '\n'
 
2449
    to ensure that records always start on new lines even if the last write was
 
2450
    interrupted. As a result its normal for the last line in the index to be
 
2451
    missing a trailing newline. One can be added with no harmful effects.
 
2452
 
 
2453
    :ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
 
2454
        where prefix is e.g. the (fileid,) for .texts instances or () for
 
2455
        constant-mapped things like .revisions, and the old state is
 
2456
        tuple(history_vector, cache_dict).  This is used to prevent having an
 
2457
        ABI change with the C extension that reads .kndx files.
 
2458
    """
 
2459
 
 
2460
    HEADER = b"# bzr knit index 8\n"
 
2461
 
 
2462
    def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
 
2463
        """Create a _KndxIndex on transport using mapper."""
 
2464
        self._transport = transport
 
2465
        self._mapper = mapper
 
2466
        self._get_scope = get_scope
 
2467
        self._allow_writes = allow_writes
 
2468
        self._is_locked = is_locked
 
2469
        self._reset_cache()
 
2470
        self.has_graph = True
 
2471
 
 
2472
    def add_records(self, records, random_id=False, missing_compression_parents=False):
 
2473
        """Add multiple records to the index.
 
2474
 
 
2475
        :param records: a list of tuples:
 
2476
                         (key, options, access_memo, parents).
 
2477
        :param random_id: If True the ids being added were randomly generated
 
2478
            and no check for existence will be performed.
 
2479
        :param missing_compression_parents: If True the records being added are
 
2480
            only compressed against texts already in the index (or inside
 
2481
            records). If False the records all refer to unavailable texts (or
 
2482
            texts inside records) as compression parents.
 
2483
        """
 
2484
        if missing_compression_parents:
 
2485
            # It might be nice to get the edge of the records. But keys isn't
 
2486
            # _wrong_.
 
2487
            keys = sorted(record[0] for record in records)
 
2488
            raise errors.RevisionNotPresent(keys, self)
 
2489
        paths = {}
 
2490
        for record in records:
 
2491
            key = record[0]
 
2492
            prefix = key[:-1]
 
2493
            path = self._mapper.map(key) + '.kndx'
 
2494
            path_keys = paths.setdefault(path, (prefix, []))
 
2495
            path_keys[1].append(record)
 
2496
        for path in sorted(paths):
 
2497
            prefix, path_keys = paths[path]
 
2498
            self._load_prefixes([prefix])
 
2499
            lines = []
 
2500
            orig_history = self._kndx_cache[prefix][1][:]
 
2501
            orig_cache = self._kndx_cache[prefix][0].copy()
 
2502
 
 
2503
            try:
 
2504
                for key, options, (_, pos, size), parents in path_keys:
 
2505
                    if not all(isinstance(option, bytes) for option in options):
 
2506
                        raise TypeError(options)
 
2507
                    if parents is None:
 
2508
                        # kndx indices cannot be parentless.
 
2509
                        parents = ()
 
2510
                    line = b' '.join([
 
2511
                        b'\n'
 
2512
                        + key[-1], b','.join(options), b'%d' % pos, b'%d' % size,
 
2513
                        self._dictionary_compress(parents), b':'])
 
2514
                    if not isinstance(line, bytes):
 
2515
                        raise AssertionError(
 
2516
                            'data must be utf8 was %s' % type(line))
 
2517
                    lines.append(line)
 
2518
                    self._cache_key(key, options, pos, size, parents)
 
2519
                if len(orig_history):
 
2520
                    self._transport.append_bytes(path, b''.join(lines))
 
2521
                else:
 
2522
                    self._init_index(path, lines)
 
2523
            except:
 
2524
                # If any problems happen, restore the original values and re-raise
 
2525
                self._kndx_cache[prefix] = (orig_cache, orig_history)
 
2526
                raise
 
2527
 
 
2528
    def scan_unvalidated_index(self, graph_index):
 
2529
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2530
        # Because kndx files do not support atomic insertion via separate index
 
2531
        # files, they do not support this method.
 
2532
        raise NotImplementedError(self.scan_unvalidated_index)
 
2533
 
 
2534
    def get_missing_compression_parents(self):
 
2535
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2536
        # Because kndx files do not support atomic insertion via separate index
 
2537
        # files, they do not support this method.
 
2538
        raise NotImplementedError(self.get_missing_compression_parents)
 
2539
 
 
2540
    def _cache_key(self, key, options, pos, size, parent_keys):
 
2541
        """Cache a version record in the history array and index cache.
 
2542
 
 
2543
        This is inlined into _load_data for performance. KEEP IN SYNC.
 
2544
        (It saves 60ms, 25% of the __init__ overhead on local 4000 record
 
2545
         indexes).
 
2546
        """
 
2547
        prefix = key[:-1]
 
2548
        version_id = key[-1]
 
2549
        # last-element only for compatibilty with the C load_data.
 
2550
        parents = tuple(parent[-1] for parent in parent_keys)
 
2551
        for parent in parent_keys:
 
2552
            if parent[:-1] != prefix:
 
2553
                raise ValueError("mismatched prefixes for %r, %r" % (
 
2554
                    key, parent_keys))
 
2555
        cache, history = self._kndx_cache[prefix]
 
2556
        # only want the _history index to reference the 1st index entry
 
2557
        # for version_id
 
2558
        if version_id not in cache:
 
2559
            index = len(history)
 
2560
            history.append(version_id)
 
2561
        else:
 
2562
            index = cache[version_id][5]
 
2563
        cache[version_id] = (version_id,
 
2564
                             options,
 
2565
                             pos,
 
2566
                             size,
 
2567
                             parents,
 
2568
                             index)
 
2569
 
 
2570
    def check_header(self, fp):
 
2571
        line = fp.readline()
 
2572
        if line == b'':
 
2573
            # An empty file can actually be treated as though the file doesn't
 
2574
            # exist yet.
 
2575
            raise errors.NoSuchFile(self)
 
2576
        if line != self.HEADER:
 
2577
            raise KnitHeaderError(badline=line, filename=self)
 
2578
 
 
2579
    def _check_read(self):
 
2580
        if not self._is_locked():
 
2581
            raise errors.ObjectNotLocked(self)
 
2582
        if self._get_scope() != self._scope:
 
2583
            self._reset_cache()
 
2584
 
 
2585
    def _check_write_ok(self):
 
2586
        """Assert if not writes are permitted."""
 
2587
        if not self._is_locked():
 
2588
            raise errors.ObjectNotLocked(self)
 
2589
        if self._get_scope() != self._scope:
 
2590
            self._reset_cache()
 
2591
        if self._mode != 'w':
 
2592
            raise errors.ReadOnlyObjectDirtiedError(self)
 
2593
 
 
2594
    def get_build_details(self, keys):
 
2595
        """Get the method, index_memo and compression parent for keys.
 
2596
 
 
2597
        Ghosts are omitted from the result.
 
2598
 
 
2599
        :param keys: An iterable of keys.
 
2600
        :return: A dict of key:(index_memo, compression_parent, parents,
 
2601
            record_details).
 
2602
            index_memo
 
2603
                opaque structure to pass to read_records to extract the raw
 
2604
                data
 
2605
            compression_parent
 
2606
                Content that this record is built upon, may be None
 
2607
            parents
 
2608
                Logical parents of this node
 
2609
            record_details
 
2610
                extra information about the content which needs to be passed to
 
2611
                Factory.parse_record
 
2612
        """
 
2613
        parent_map = self.get_parent_map(keys)
 
2614
        result = {}
 
2615
        for key in keys:
 
2616
            if key not in parent_map:
 
2617
                continue  # Ghost
 
2618
            method = self.get_method(key)
 
2619
            if not isinstance(method, str):
 
2620
                raise TypeError(method)
 
2621
            parents = parent_map[key]
 
2622
            if method == 'fulltext':
 
2623
                compression_parent = None
 
2624
            else:
 
2625
                compression_parent = parents[0]
 
2626
            noeol = b'no-eol' in self.get_options(key)
 
2627
            index_memo = self.get_position(key)
 
2628
            result[key] = (index_memo, compression_parent,
 
2629
                           parents, (method, noeol))
 
2630
        return result
 
2631
 
 
2632
    def get_method(self, key):
 
2633
        """Return compression method of specified key."""
 
2634
        options = self.get_options(key)
 
2635
        if b'fulltext' in options:
 
2636
            return 'fulltext'
 
2637
        elif b'line-delta' in options:
 
2638
            return 'line-delta'
 
2639
        else:
 
2640
            raise KnitIndexUnknownMethod(self, options)
 
2641
 
 
2642
    def get_options(self, key):
 
2643
        """Return a list representing options.
 
2644
 
 
2645
        e.g. ['foo', 'bar']
 
2646
        """
 
2647
        prefix, suffix = self._split_key(key)
 
2648
        self._load_prefixes([prefix])
 
2649
        try:
 
2650
            return self._kndx_cache[prefix][0][suffix][1]
 
2651
        except KeyError:
 
2652
            raise RevisionNotPresent(key, self)
 
2653
 
 
2654
    def find_ancestry(self, keys):
 
2655
        """See CombinedGraphIndex.find_ancestry()"""
 
2656
        prefixes = set(key[:-1] for key in keys)
 
2657
        self._load_prefixes(prefixes)
 
2658
        result = {}
 
2659
        parent_map = {}
 
2660
        missing_keys = set()
 
2661
        pending_keys = list(keys)
 
2662
        # This assumes that keys will not reference parents in a different
 
2663
        # prefix, which is accurate so far.
 
2664
        while pending_keys:
 
2665
            key = pending_keys.pop()
 
2666
            if key in parent_map:
 
2667
                continue
 
2668
            prefix = key[:-1]
 
2669
            try:
 
2670
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2671
            except KeyError:
 
2672
                missing_keys.add(key)
 
2673
            else:
 
2674
                parent_keys = tuple([prefix + (suffix,)
 
2675
                                     for suffix in suffix_parents])
 
2676
                parent_map[key] = parent_keys
 
2677
                pending_keys.extend([p for p in parent_keys
 
2678
                                     if p not in parent_map])
 
2679
        return parent_map, missing_keys
 
2680
 
 
2681
    def get_parent_map(self, keys):
 
2682
        """Get a map of the parents of keys.
 
2683
 
 
2684
        :param keys: The keys to look up parents for.
 
2685
        :return: A mapping from keys to parents. Absent keys are absent from
 
2686
            the mapping.
 
2687
        """
 
2688
        # Parse what we need to up front, this potentially trades off I/O
 
2689
        # locality (.kndx and .knit in the same block group for the same file
 
2690
        # id) for less checking in inner loops.
 
2691
        prefixes = set(key[:-1] for key in keys)
 
2692
        self._load_prefixes(prefixes)
 
2693
        result = {}
 
2694
        for key in keys:
 
2695
            prefix = key[:-1]
 
2696
            try:
 
2697
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2698
            except KeyError:
 
2699
                pass
 
2700
            else:
 
2701
                result[key] = tuple(prefix + (suffix,) for
 
2702
                                    suffix in suffix_parents)
 
2703
        return result
 
2704
 
 
2705
    def get_position(self, key):
 
2706
        """Return details needed to access the version.
 
2707
 
 
2708
        :return: a tuple (key, data position, size) to hand to the access
 
2709
            logic to get the record.
 
2710
        """
 
2711
        prefix, suffix = self._split_key(key)
 
2712
        self._load_prefixes([prefix])
 
2713
        entry = self._kndx_cache[prefix][0][suffix]
 
2714
        return key, entry[2], entry[3]
 
2715
 
 
2716
    __contains__ = _mod_index._has_key_from_parent_map
 
2717
 
 
2718
    def _init_index(self, path, extra_lines=[]):
 
2719
        """Initialize an index."""
 
2720
        sio = BytesIO()
 
2721
        sio.write(self.HEADER)
 
2722
        sio.writelines(extra_lines)
 
2723
        sio.seek(0)
 
2724
        self._transport.put_file_non_atomic(path, sio,
 
2725
                                            create_parent_dir=True)
 
2726
        # self._create_parent_dir)
 
2727
        # mode=self._file_mode,
 
2728
        # dir_mode=self._dir_mode)
 
2729
 
 
2730
    def keys(self):
 
2731
        """Get all the keys in the collection.
 
2732
 
 
2733
        The keys are not ordered.
 
2734
        """
 
2735
        result = set()
 
2736
        # Identify all key prefixes.
 
2737
        # XXX: A bit hacky, needs polish.
 
2738
        if isinstance(self._mapper, ConstantMapper):
 
2739
            prefixes = [()]
 
2740
        else:
 
2741
            relpaths = set()
 
2742
            for quoted_relpath in self._transport.iter_files_recursive():
 
2743
                path, ext = os.path.splitext(quoted_relpath)
 
2744
                relpaths.add(path)
 
2745
            prefixes = [self._mapper.unmap(path) for path in relpaths]
 
2746
        self._load_prefixes(prefixes)
 
2747
        for prefix in prefixes:
 
2748
            for suffix in self._kndx_cache[prefix][1]:
 
2749
                result.add(prefix + (suffix,))
 
2750
        return result
 
2751
 
 
2752
    def _load_prefixes(self, prefixes):
 
2753
        """Load the indices for prefixes."""
 
2754
        self._check_read()
 
2755
        for prefix in prefixes:
 
2756
            if prefix not in self._kndx_cache:
 
2757
                # the load_data interface writes to these variables.
 
2758
                self._cache = {}
 
2759
                self._history = []
 
2760
                self._filename = prefix
 
2761
                try:
 
2762
                    path = self._mapper.map(prefix) + '.kndx'
 
2763
                    with self._transport.get(path) as fp:
 
2764
                        # _load_data may raise NoSuchFile if the target knit is
 
2765
                        # completely empty.
 
2766
                        _load_data(self, fp)
 
2767
                    self._kndx_cache[prefix] = (self._cache, self._history)
 
2768
                    del self._cache
 
2769
                    del self._filename
 
2770
                    del self._history
 
2771
                except NoSuchFile:
 
2772
                    self._kndx_cache[prefix] = ({}, [])
 
2773
                    if isinstance(self._mapper, ConstantMapper):
 
2774
                        # preserve behaviour for revisions.kndx etc.
 
2775
                        self._init_index(path)
 
2776
                    del self._cache
 
2777
                    del self._filename
 
2778
                    del self._history
 
2779
 
 
2780
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2781
 
 
2782
    def _partition_keys(self, keys):
 
2783
        """Turn keys into a dict of prefix:suffix_list."""
 
2784
        result = {}
 
2785
        for key in keys:
 
2786
            prefix_keys = result.setdefault(key[:-1], [])
 
2787
            prefix_keys.append(key[-1])
 
2788
        return result
 
2789
 
 
2790
    def _dictionary_compress(self, keys):
 
2791
        """Dictionary compress keys.
 
2792
 
 
2793
        :param keys: The keys to generate references to.
 
2794
        :return: A string representation of keys. keys which are present are
 
2795
            dictionary compressed, and others are emitted as fulltext with a
 
2796
            '.' prefix.
 
2797
        """
 
2798
        if not keys:
 
2799
            return b''
 
2800
        result_list = []
 
2801
        prefix = keys[0][:-1]
 
2802
        cache = self._kndx_cache[prefix][0]
 
2803
        for key in keys:
 
2804
            if key[:-1] != prefix:
 
2805
                # kndx indices cannot refer across partitioned storage.
 
2806
                raise ValueError("mismatched prefixes for %r" % keys)
 
2807
            if key[-1] in cache:
 
2808
                # -- inlined lookup() --
 
2809
                result_list.append(b'%d' % cache[key[-1]][5])
 
2810
                # -- end lookup () --
 
2811
            else:
 
2812
                result_list.append(b'.' + key[-1])
 
2813
        return b' '.join(result_list)
 
2814
 
 
2815
    def _reset_cache(self):
 
2816
        # Possibly this should be a LRU cache. A dictionary from key_prefix to
 
2817
        # (cache_dict, history_vector) for parsed kndx files.
 
2818
        self._kndx_cache = {}
 
2819
        self._scope = self._get_scope()
 
2820
        allow_writes = self._allow_writes()
 
2821
        if allow_writes:
 
2822
            self._mode = 'w'
 
2823
        else:
 
2824
            self._mode = 'r'
 
2825
 
 
2826
    def _sort_keys_by_io(self, keys, positions):
 
2827
        """Figure out an optimal order to read the records for the given keys.
 
2828
 
 
2829
        Sort keys, grouped by index and sorted by position.
 
2830
 
 
2831
        :param keys: A list of keys whose records we want to read. This will be
 
2832
            sorted 'in-place'.
 
2833
        :param positions: A dict, such as the one returned by
 
2834
            _get_components_positions()
 
2835
        :return: None
 
2836
        """
 
2837
        def get_sort_key(key):
 
2838
            index_memo = positions[key][1]
 
2839
            # Group by prefix and position. index_memo[0] is the key, so it is
 
2840
            # (file_id, revision_id) and we don't want to sort on revision_id,
 
2841
            # index_memo[1] is the position, and index_memo[2] is the size,
 
2842
            # which doesn't matter for the sort
 
2843
            return index_memo[0][:-1], index_memo[1]
 
2844
        return keys.sort(key=get_sort_key)
 
2845
 
 
2846
    _get_total_build_size = _get_total_build_size
 
2847
 
 
2848
    def _split_key(self, key):
 
2849
        """Split key into a prefix and suffix."""
 
2850
        # GZ 2018-07-03: This is intentionally either a sequence or bytes?
 
2851
        if isinstance(key, bytes):
 
2852
            return key[:-1], key[-1:]
 
2853
        return key[:-1], key[-1]
 
2854
 
 
2855
 
 
2856
class _KnitGraphIndex(object):
 
2857
    """A KnitVersionedFiles index layered on GraphIndex."""
 
2858
 
 
2859
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
 
2860
                 add_callback=None, track_external_parent_refs=False):
 
2861
        """Construct a KnitGraphIndex on a graph_index.
 
2862
 
 
2863
        :param graph_index: An implementation of breezy.index.GraphIndex.
 
2864
        :param is_locked: A callback to check whether the object should answer
 
2865
            queries.
 
2866
        :param deltas: Allow delta-compressed records.
 
2867
        :param parents: If True, record knits parents, if not do not record
 
2868
            parents.
 
2869
        :param add_callback: If not None, allow additions to the index and call
 
2870
            this callback with a list of added GraphIndex nodes:
 
2871
            [(node, value, node_refs), ...]
 
2872
        :param is_locked: A callback, returns True if the index is locked and
 
2873
            thus usable.
 
2874
        :param track_external_parent_refs: If True, record all external parent
 
2875
            references parents from added records.  These can be retrieved
 
2876
            later by calling get_missing_parents().
 
2877
        """
 
2878
        self._add_callback = add_callback
 
2879
        self._graph_index = graph_index
 
2880
        self._deltas = deltas
 
2881
        self._parents = parents
 
2882
        if deltas and not parents:
 
2883
            # XXX: TODO: Delta tree and parent graph should be conceptually
 
2884
            # separate.
 
2885
            raise KnitCorrupt(self, "Cannot do delta compression without "
 
2886
                              "parent tracking.")
 
2887
        self.has_graph = parents
 
2888
        self._is_locked = is_locked
 
2889
        self._missing_compression_parents = set()
 
2890
        if track_external_parent_refs:
 
2891
            self._key_dependencies = _KeyRefs()
 
2892
        else:
 
2893
            self._key_dependencies = None
 
2894
 
 
2895
    def __repr__(self):
 
2896
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
 
2897
 
 
2898
    def add_records(self, records, random_id=False,
 
2899
                    missing_compression_parents=False):
 
2900
        """Add multiple records to the index.
 
2901
 
 
2902
        This function does not insert data into the Immutable GraphIndex
 
2903
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
2904
        the caller and checks that it is safe to insert then calls
 
2905
        self._add_callback with the prepared GraphIndex nodes.
 
2906
 
 
2907
        :param records: a list of tuples:
 
2908
                         (key, options, access_memo, parents).
 
2909
        :param random_id: If True the ids being added were randomly generated
 
2910
            and no check for existence will be performed.
 
2911
        :param missing_compression_parents: If True the records being added are
 
2912
            only compressed against texts already in the index (or inside
 
2913
            records). If False the records all refer to unavailable texts (or
 
2914
            texts inside records) as compression parents.
 
2915
        """
 
2916
        if not self._add_callback:
 
2917
            raise errors.ReadOnlyError(self)
 
2918
        # we hope there are no repositories with inconsistent parentage
 
2919
        # anymore.
 
2920
 
 
2921
        keys = {}
 
2922
        compression_parents = set()
 
2923
        key_dependencies = self._key_dependencies
 
2924
        for (key, options, access_memo, parents) in records:
 
2925
            if self._parents:
 
2926
                parents = tuple(parents)
 
2927
                if key_dependencies is not None:
 
2928
                    key_dependencies.add_references(key, parents)
 
2929
            index, pos, size = access_memo
 
2930
            if b'no-eol' in options:
 
2931
                value = b'N'
 
2932
            else:
 
2933
                value = b' '
 
2934
            value += b"%d %d" % (pos, size)
 
2935
            if not self._deltas:
 
2936
                if b'line-delta' in options:
 
2937
                    raise KnitCorrupt(
 
2938
                        self, "attempt to add line-delta in non-delta knit")
 
2939
            if self._parents:
 
2940
                if self._deltas:
 
2941
                    if b'line-delta' in options:
 
2942
                        node_refs = (parents, (parents[0],))
 
2943
                        if missing_compression_parents:
 
2944
                            compression_parents.add(parents[0])
 
2945
                    else:
 
2946
                        node_refs = (parents, ())
 
2947
                else:
 
2948
                    node_refs = (parents, )
 
2949
            else:
 
2950
                if parents:
 
2951
                    raise KnitCorrupt(self, "attempt to add node with parents "
 
2952
                                      "in parentless index.")
 
2953
                node_refs = ()
 
2954
            keys[key] = (value, node_refs)
 
2955
        # check for dups
 
2956
        if not random_id:
 
2957
            present_nodes = self._get_entries(keys)
 
2958
            for (index, key, value, node_refs) in present_nodes:
 
2959
                parents = node_refs[:1]
 
2960
                # Sometimes these are passed as a list rather than a tuple
 
2961
                passed = static_tuple.as_tuples(keys[key])
 
2962
                passed_parents = passed[1][:1]
 
2963
                if (value[0:1] != keys[key][0][0:1]
 
2964
                        or parents != passed_parents):
 
2965
                    node_refs = static_tuple.as_tuples(node_refs)
 
2966
                    raise KnitCorrupt(self, "inconsistent details in add_records"
 
2967
                                      ": %s %s" % ((value, node_refs), passed))
 
2968
                del keys[key]
 
2969
        result = []
 
2970
        if self._parents:
 
2971
            for key, (value, node_refs) in viewitems(keys):
 
2972
                result.append((key, value, node_refs))
 
2973
        else:
 
2974
            for key, (value, node_refs) in viewitems(keys):
 
2975
                result.append((key, value))
 
2976
        self._add_callback(result)
 
2977
        if missing_compression_parents:
 
2978
            # This may appear to be incorrect (it does not check for
 
2979
            # compression parents that are in the existing graph index),
 
2980
            # but such records won't have been buffered, so this is
 
2981
            # actually correct: every entry when
 
2982
            # missing_compression_parents==True either has a missing parent, or
 
2983
            # a parent that is one of the keys in records.
 
2984
            compression_parents.difference_update(keys)
 
2985
            self._missing_compression_parents.update(compression_parents)
 
2986
        # Adding records may have satisfied missing compression parents.
 
2987
        self._missing_compression_parents.difference_update(keys)
 
2988
 
 
2989
    def scan_unvalidated_index(self, graph_index):
 
2990
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2991
 
 
2992
        This allows this _KnitGraphIndex to keep track of any missing
 
2993
        compression parents we may want to have filled in to make those
 
2994
        indices valid.
 
2995
 
 
2996
        :param graph_index: A GraphIndex
 
2997
        """
 
2998
        if self._deltas:
 
2999
            new_missing = graph_index.external_references(ref_list_num=1)
 
3000
            new_missing.difference_update(self.get_parent_map(new_missing))
 
3001
            self._missing_compression_parents.update(new_missing)
 
3002
        if self._key_dependencies is not None:
 
3003
            # Add parent refs from graph_index (and discard parent refs that
 
3004
            # the graph_index has).
 
3005
            for node in graph_index.iter_all_entries():
 
3006
                self._key_dependencies.add_references(node[1], node[3][0])
 
3007
 
 
3008
    def get_missing_compression_parents(self):
 
3009
        """Return the keys of missing compression parents.
 
3010
 
 
3011
        Missing compression parents occur when a record stream was missing
 
3012
        basis texts, or a index was scanned that had missing basis texts.
 
3013
        """
 
3014
        return frozenset(self._missing_compression_parents)
 
3015
 
 
3016
    def get_missing_parents(self):
 
3017
        """Return the keys of missing parents."""
 
3018
        # If updating this, you should also update
 
3019
        # groupcompress._GCGraphIndex.get_missing_parents
 
3020
        # We may have false positives, so filter those out.
 
3021
        self._key_dependencies.satisfy_refs_for_keys(
 
3022
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
3023
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
3024
 
 
3025
    def _check_read(self):
 
3026
        """raise if reads are not permitted."""
 
3027
        if not self._is_locked():
 
3028
            raise errors.ObjectNotLocked(self)
 
3029
 
 
3030
    def _check_write_ok(self):
 
3031
        """Assert if writes are not permitted."""
 
3032
        if not self._is_locked():
 
3033
            raise errors.ObjectNotLocked(self)
 
3034
 
 
3035
    def _compression_parent(self, an_entry):
 
3036
        # return the key that an_entry is compressed against, or None
 
3037
        # Grab the second parent list (as deltas implies parents currently)
 
3038
        compression_parents = an_entry[3][1]
 
3039
        if not compression_parents:
 
3040
            return None
 
3041
        if len(compression_parents) != 1:
 
3042
            raise AssertionError(
 
3043
                "Too many compression parents: %r" % compression_parents)
 
3044
        return compression_parents[0]
 
3045
 
 
3046
    def get_build_details(self, keys):
 
3047
        """Get the method, index_memo and compression parent for version_ids.
 
3048
 
 
3049
        Ghosts are omitted from the result.
 
3050
 
 
3051
        :param keys: An iterable of keys.
 
3052
        :return: A dict of key:
 
3053
            (index_memo, compression_parent, parents, record_details).
 
3054
            index_memo
 
3055
                opaque structure to pass to read_records to extract the raw
 
3056
                data
 
3057
            compression_parent
 
3058
                Content that this record is built upon, may be None
 
3059
            parents
 
3060
                Logical parents of this node
 
3061
            record_details
 
3062
                extra information about the content which needs to be passed to
 
3063
                Factory.parse_record
 
3064
        """
 
3065
        self._check_read()
 
3066
        result = {}
 
3067
        entries = self._get_entries(keys, False)
 
3068
        for entry in entries:
 
3069
            key = entry[1]
 
3070
            if not self._parents:
 
3071
                parents = ()
 
3072
            else:
 
3073
                parents = entry[3][0]
 
3074
            if not self._deltas:
 
3075
                compression_parent_key = None
 
3076
            else:
 
3077
                compression_parent_key = self._compression_parent(entry)
 
3078
            noeol = (entry[2][0:1] == b'N')
 
3079
            if compression_parent_key:
 
3080
                method = 'line-delta'
 
3081
            else:
 
3082
                method = 'fulltext'
 
3083
            result[key] = (self._node_to_position(entry),
 
3084
                           compression_parent_key, parents,
 
3085
                           (method, noeol))
 
3086
        return result
 
3087
 
 
3088
    def _get_entries(self, keys, check_present=False):
 
3089
        """Get the entries for keys.
 
3090
 
 
3091
        :param keys: An iterable of index key tuples.
 
3092
        """
 
3093
        keys = set(keys)
 
3094
        found_keys = set()
 
3095
        if self._parents:
 
3096
            for node in self._graph_index.iter_entries(keys):
 
3097
                yield node
 
3098
                found_keys.add(node[1])
 
3099
        else:
 
3100
            # adapt parentless index to the rest of the code.
 
3101
            for node in self._graph_index.iter_entries(keys):
 
3102
                yield node[0], node[1], node[2], ()
 
3103
                found_keys.add(node[1])
 
3104
        if check_present:
 
3105
            missing_keys = keys.difference(found_keys)
 
3106
            if missing_keys:
 
3107
                raise RevisionNotPresent(missing_keys.pop(), self)
 
3108
 
 
3109
    def get_method(self, key):
 
3110
        """Return compression method of specified key."""
 
3111
        return self._get_method(self._get_node(key))
 
3112
 
 
3113
    def _get_method(self, node):
 
3114
        if not self._deltas:
 
3115
            return 'fulltext'
 
3116
        if self._compression_parent(node):
 
3117
            return 'line-delta'
 
3118
        else:
 
3119
            return 'fulltext'
 
3120
 
 
3121
    def _get_node(self, key):
 
3122
        try:
 
3123
            return list(self._get_entries([key]))[0]
 
3124
        except IndexError:
 
3125
            raise RevisionNotPresent(key, self)
 
3126
 
 
3127
    def get_options(self, key):
 
3128
        """Return a list representing options.
 
3129
 
 
3130
        e.g. ['foo', 'bar']
 
3131
        """
 
3132
        node = self._get_node(key)
 
3133
        options = [self._get_method(node).encode('ascii')]
 
3134
        if node[2][0:1] == b'N':
 
3135
            options.append(b'no-eol')
 
3136
        return options
 
3137
 
 
3138
    def find_ancestry(self, keys):
 
3139
        """See CombinedGraphIndex.find_ancestry()"""
 
3140
        return self._graph_index.find_ancestry(keys, 0)
 
3141
 
 
3142
    def get_parent_map(self, keys):
 
3143
        """Get a map of the parents of keys.
 
3144
 
 
3145
        :param keys: The keys to look up parents for.
 
3146
        :return: A mapping from keys to parents. Absent keys are absent from
 
3147
            the mapping.
 
3148
        """
 
3149
        self._check_read()
 
3150
        nodes = self._get_entries(keys)
 
3151
        result = {}
 
3152
        if self._parents:
 
3153
            for node in nodes:
 
3154
                result[node[1]] = node[3][0]
 
3155
        else:
 
3156
            for node in nodes:
 
3157
                result[node[1]] = None
 
3158
        return result
 
3159
 
 
3160
    def get_position(self, key):
 
3161
        """Return details needed to access the version.
 
3162
 
 
3163
        :return: a tuple (index, data position, size) to hand to the access
 
3164
            logic to get the record.
 
3165
        """
 
3166
        node = self._get_node(key)
 
3167
        return self._node_to_position(node)
 
3168
 
 
3169
    __contains__ = _mod_index._has_key_from_parent_map
 
3170
 
 
3171
    def keys(self):
 
3172
        """Get all the keys in the collection.
 
3173
 
 
3174
        The keys are not ordered.
 
3175
        """
 
3176
        self._check_read()
 
3177
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
3178
 
 
3179
    missing_keys = _mod_index._missing_keys_from_parent_map
 
3180
 
 
3181
    def _node_to_position(self, node):
 
3182
        """Convert an index value to position details."""
 
3183
        bits = node[2][1:].split(b' ')
 
3184
        return node[0], int(bits[0]), int(bits[1])
 
3185
 
 
3186
    def _sort_keys_by_io(self, keys, positions):
 
3187
        """Figure out an optimal order to read the records for the given keys.
 
3188
 
 
3189
        Sort keys, grouped by index and sorted by position.
 
3190
 
 
3191
        :param keys: A list of keys whose records we want to read. This will be
 
3192
            sorted 'in-place'.
 
3193
        :param positions: A dict, such as the one returned by
 
3194
            _get_components_positions()
 
3195
        :return: None
 
3196
        """
 
3197
        def get_index_memo(key):
 
3198
            # index_memo is at offset [1]. It is made up of (GraphIndex,
 
3199
            # position, size). GI is an object, which will be unique for each
 
3200
            # pack file. This causes us to group by pack file, then sort by
 
3201
            # position. Size doesn't matter, but it isn't worth breaking up the
 
3202
            # tuple.
 
3203
            return positions[key][1]
 
3204
        return keys.sort(key=get_index_memo)
 
3205
 
 
3206
    _get_total_build_size = _get_total_build_size
 
3207
 
 
3208
 
 
3209
class _KnitKeyAccess(object):
 
3210
    """Access to records in .knit files."""
 
3211
 
 
3212
    def __init__(self, transport, mapper):
 
3213
        """Create a _KnitKeyAccess with transport and mapper.
 
3214
 
 
3215
        :param transport: The transport the access object is rooted at.
 
3216
        :param mapper: The mapper used to map keys to .knit files.
 
3217
        """
 
3218
        self._transport = transport
 
3219
        self._mapper = mapper
 
3220
 
 
3221
    def add_raw_records(self, key_sizes, raw_data):
 
3222
        """Add raw knit bytes to a storage area.
 
3223
 
 
3224
        The data is spooled to the container writer in one bytes-record per
 
3225
        raw data item.
 
3226
 
 
3227
        :param sizes: An iterable of tuples containing the key and size of each
 
3228
            raw data segment.
 
3229
        :param raw_data: A bytestring containing the data.
 
3230
        :return: A list of memos to retrieve the record later. Each memo is an
 
3231
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
 
3232
            length), where the key is the record key.
 
3233
        """
 
3234
        if not isinstance(raw_data, bytes):
 
3235
            raise AssertionError(
 
3236
                'data must be plain bytes was %s' % type(raw_data))
 
3237
        result = []
 
3238
        offset = 0
 
3239
        # TODO: This can be tuned for writing to sftp and other servers where
 
3240
        # append() is relatively expensive by grouping the writes to each key
 
3241
        # prefix.
 
3242
        for key, size in key_sizes:
 
3243
            path = self._mapper.map(key)
 
3244
            try:
 
3245
                base = self._transport.append_bytes(path + '.knit',
 
3246
                                                    raw_data[offset:offset + size])
 
3247
            except errors.NoSuchFile:
 
3248
                self._transport.mkdir(osutils.dirname(path))
 
3249
                base = self._transport.append_bytes(path + '.knit',
 
3250
                                                    raw_data[offset:offset + size])
 
3251
            # if base == 0:
 
3252
            # chmod.
 
3253
            offset += size
 
3254
            result.append((key, base, size))
 
3255
        return result
 
3256
 
 
3257
    def flush(self):
 
3258
        """Flush pending writes on this access object.
 
3259
 
 
3260
        For .knit files this is a no-op.
 
3261
        """
 
3262
        pass
 
3263
 
 
3264
    def get_raw_records(self, memos_for_retrieval):
 
3265
        """Get the raw bytes for a records.
 
3266
 
 
3267
        :param memos_for_retrieval: An iterable containing the access memo for
 
3268
            retrieving the bytes.
 
3269
        :return: An iterator over the bytes of the records.
 
3270
        """
 
3271
        # first pass, group into same-index request to minimise readv's issued.
 
3272
        request_lists = []
 
3273
        current_prefix = None
 
3274
        for (key, offset, length) in memos_for_retrieval:
 
3275
            if current_prefix == key[:-1]:
 
3276
                current_list.append((offset, length))
 
3277
            else:
 
3278
                if current_prefix is not None:
 
3279
                    request_lists.append((current_prefix, current_list))
 
3280
                current_prefix = key[:-1]
 
3281
                current_list = [(offset, length)]
 
3282
        # handle the last entry
 
3283
        if current_prefix is not None:
 
3284
            request_lists.append((current_prefix, current_list))
 
3285
        for prefix, read_vector in request_lists:
 
3286
            path = self._mapper.map(prefix) + '.knit'
 
3287
            for pos, data in self._transport.readv(path, read_vector):
 
3288
                yield data
 
3289
 
 
3290
 
 
3291
def annotate_knit(knit, revision_id):
 
3292
    """Annotate a knit with no cached annotations.
 
3293
 
 
3294
    This implementation is for knits with no cached annotations.
 
3295
    It will work for knits with cached annotations, but this is not
 
3296
    recommended.
 
3297
    """
 
3298
    annotator = _KnitAnnotator(knit)
 
3299
    return iter(annotator.annotate_flat(revision_id))
 
3300
 
 
3301
 
 
3302
class _KnitAnnotator(annotate.Annotator):
 
3303
    """Build up the annotations for a text."""
 
3304
 
 
3305
    def __init__(self, vf):
 
3306
        annotate.Annotator.__init__(self, vf)
 
3307
 
 
3308
        # TODO: handle Nodes which cannot be extracted
 
3309
        # self._ghosts = set()
 
3310
 
 
3311
        # Map from (key, parent_key) => matching_blocks, should be 'use once'
 
3312
        self._matching_blocks = {}
 
3313
 
 
3314
        # KnitContent objects
 
3315
        self._content_objects = {}
 
3316
        # The number of children that depend on this fulltext content object
 
3317
        self._num_compression_children = {}
 
3318
        # Delta records that need their compression parent before they can be
 
3319
        # expanded
 
3320
        self._pending_deltas = {}
 
3321
        # Fulltext records that are waiting for their parents fulltexts before
 
3322
        # they can be yielded for annotation
 
3323
        self._pending_annotation = {}
 
3324
 
 
3325
        self._all_build_details = {}
 
3326
 
 
3327
    def _get_build_graph(self, key):
 
3328
        """Get the graphs for building texts and annotations.
 
3329
 
 
3330
        The data you need for creating a full text may be different than the
 
3331
        data you need to annotate that text. (At a minimum, you need both
 
3332
        parents to create an annotation, but only need 1 parent to generate the
 
3333
        fulltext.)
 
3334
 
 
3335
        :return: A list of (key, index_memo) records, suitable for
 
3336
            passing to read_records_iter to start reading in the raw data from
 
3337
            the pack file.
 
3338
        """
 
3339
        pending = {key}
 
3340
        records = []
 
3341
        ann_keys = set()
 
3342
        self._num_needed_children[key] = 1
 
3343
        while pending:
 
3344
            # get all pending nodes
 
3345
            this_iteration = pending
 
3346
            build_details = self._vf._index.get_build_details(this_iteration)
 
3347
            self._all_build_details.update(build_details)
 
3348
            # new_nodes = self._vf._index._get_entries(this_iteration)
 
3349
            pending = set()
 
3350
            for key, details in viewitems(build_details):
 
3351
                (index_memo, compression_parent, parent_keys,
 
3352
                 record_details) = details
 
3353
                self._parent_map[key] = parent_keys
 
3354
                self._heads_provider = None
 
3355
                records.append((key, index_memo))
 
3356
                # Do we actually need to check _annotated_lines?
 
3357
                pending.update([p for p in parent_keys
 
3358
                                if p not in self._all_build_details])
 
3359
                if parent_keys:
 
3360
                    for parent_key in parent_keys:
 
3361
                        if parent_key in self._num_needed_children:
 
3362
                            self._num_needed_children[parent_key] += 1
 
3363
                        else:
 
3364
                            self._num_needed_children[parent_key] = 1
 
3365
                if compression_parent:
 
3366
                    if compression_parent in self._num_compression_children:
 
3367
                        self._num_compression_children[compression_parent] += 1
 
3368
                    else:
 
3369
                        self._num_compression_children[compression_parent] = 1
 
3370
 
 
3371
            missing_versions = this_iteration.difference(build_details)
 
3372
            if missing_versions:
 
3373
                for key in missing_versions:
 
3374
                    if key in self._parent_map and key in self._text_cache:
 
3375
                        # We already have this text ready, we just need to
 
3376
                        # yield it later so we get it annotated
 
3377
                        ann_keys.add(key)
 
3378
                        parent_keys = self._parent_map[key]
 
3379
                        for parent_key in parent_keys:
 
3380
                            if parent_key in self._num_needed_children:
 
3381
                                self._num_needed_children[parent_key] += 1
 
3382
                            else:
 
3383
                                self._num_needed_children[parent_key] = 1
 
3384
                        pending.update([p for p in parent_keys
 
3385
                                        if p not in self._all_build_details])
 
3386
                    else:
 
3387
                        raise errors.RevisionNotPresent(key, self._vf)
 
3388
        # Generally we will want to read the records in reverse order, because
 
3389
        # we find the parent nodes after the children
 
3390
        records.reverse()
 
3391
        return records, ann_keys
 
3392
 
 
3393
    def _get_needed_texts(self, key, pb=None):
 
3394
        # if True or len(self._vf._immediate_fallback_vfs) > 0:
 
3395
        if len(self._vf._immediate_fallback_vfs) > 0:
 
3396
            # If we have fallbacks, go to the generic path
 
3397
            for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
 
3398
                yield v
 
3399
            return
 
3400
        while True:
 
3401
            try:
 
3402
                records, ann_keys = self._get_build_graph(key)
 
3403
                for idx, (sub_key, text, num_lines) in enumerate(
 
3404
                        self._extract_texts(records)):
 
3405
                    if pb is not None:
 
3406
                        pb.update(gettext('annotating'), idx, len(records))
 
3407
                    yield sub_key, text, num_lines
 
3408
                for sub_key in ann_keys:
 
3409
                    text = self._text_cache[sub_key]
 
3410
                    num_lines = len(text)  # bad assumption
 
3411
                    yield sub_key, text, num_lines
 
3412
                return
 
3413
            except errors.RetryWithNewPacks as e:
 
3414
                self._vf._access.reload_or_raise(e)
 
3415
                # The cached build_details are no longer valid
 
3416
                self._all_build_details.clear()
 
3417
 
 
3418
    def _cache_delta_blocks(self, key, compression_parent, delta, lines):
 
3419
        parent_lines = self._text_cache[compression_parent]
 
3420
        blocks = list(KnitContent.get_line_delta_blocks(
 
3421
            delta, parent_lines, lines))
 
3422
        self._matching_blocks[(key, compression_parent)] = blocks
 
3423
 
 
3424
    def _expand_record(self, key, parent_keys, compression_parent, record,
 
3425
                       record_details):
 
3426
        delta = None
 
3427
        if compression_parent:
 
3428
            if compression_parent not in self._content_objects:
 
3429
                # Waiting for the parent
 
3430
                self._pending_deltas.setdefault(compression_parent, []).append(
 
3431
                    (key, parent_keys, record, record_details))
 
3432
                return None
 
3433
            # We have the basis parent, so expand the delta
 
3434
            num = self._num_compression_children[compression_parent]
 
3435
            num -= 1
 
3436
            if num == 0:
 
3437
                base_content = self._content_objects.pop(compression_parent)
 
3438
                self._num_compression_children.pop(compression_parent)
 
3439
            else:
 
3440
                self._num_compression_children[compression_parent] = num
 
3441
                base_content = self._content_objects[compression_parent]
 
3442
            # It is tempting to want to copy_base_content=False for the last
 
3443
            # child object. However, whenever noeol=False,
 
3444
            # self._text_cache[parent_key] is content._lines. So mutating it
 
3445
            # gives very bad results.
 
3446
            # The alternative is to copy the lines into text cache, but then we
 
3447
            # are copying anyway, so just do it here.
 
3448
            content, delta = self._vf._factory.parse_record(
 
3449
                key, record, record_details, base_content,
 
3450
                copy_base_content=True)
 
3451
        else:
 
3452
            # Fulltext record
 
3453
            content, _ = self._vf._factory.parse_record(
 
3454
                key, record, record_details, None)
 
3455
        if self._num_compression_children.get(key, 0) > 0:
 
3456
            self._content_objects[key] = content
 
3457
        lines = content.text()
 
3458
        self._text_cache[key] = lines
 
3459
        if delta is not None:
 
3460
            self._cache_delta_blocks(key, compression_parent, delta, lines)
 
3461
        return lines
 
3462
 
 
3463
    def _get_parent_annotations_and_matches(self, key, text, parent_key):
 
3464
        """Get the list of annotations for the parent, and the matching lines.
 
3465
 
 
3466
        :param text: The opaque value given by _get_needed_texts
 
3467
        :param parent_key: The key for the parent text
 
3468
        :return: (parent_annotations, matching_blocks)
 
3469
            parent_annotations is a list as long as the number of lines in
 
3470
                parent
 
3471
            matching_blocks is a list of (parent_idx, text_idx, len) tuples
 
3472
                indicating which lines match between the two texts
 
3473
        """
 
3474
        block_key = (key, parent_key)
 
3475
        if block_key in self._matching_blocks:
 
3476
            blocks = self._matching_blocks.pop(block_key)
 
3477
            parent_annotations = self._annotations_cache[parent_key]
 
3478
            return parent_annotations, blocks
 
3479
        return annotate.Annotator._get_parent_annotations_and_matches(self,
 
3480
                                                                      key, text, parent_key)
 
3481
 
 
3482
    def _process_pending(self, key):
 
3483
        """The content for 'key' was just processed.
 
3484
 
 
3485
        Determine if there is any more pending work to be processed.
 
3486
        """
 
3487
        to_return = []
 
3488
        if key in self._pending_deltas:
 
3489
            compression_parent = key
 
3490
            children = self._pending_deltas.pop(key)
 
3491
            for child_key, parent_keys, record, record_details in children:
 
3492
                lines = self._expand_record(child_key, parent_keys,
 
3493
                                            compression_parent,
 
3494
                                            record, record_details)
 
3495
                if self._check_ready_for_annotations(child_key, parent_keys):
 
3496
                    to_return.append(child_key)
 
3497
        # Also check any children that are waiting for this parent to be
 
3498
        # annotation ready
 
3499
        if key in self._pending_annotation:
 
3500
            children = self._pending_annotation.pop(key)
 
3501
            to_return.extend([c for c, p_keys in children
 
3502
                              if self._check_ready_for_annotations(c, p_keys)])
 
3503
        return to_return
 
3504
 
 
3505
    def _check_ready_for_annotations(self, key, parent_keys):
 
3506
        """return true if this text is ready to be yielded.
 
3507
 
 
3508
        Otherwise, this will return False, and queue the text into
 
3509
        self._pending_annotation
 
3510
        """
 
3511
        for parent_key in parent_keys:
 
3512
            if parent_key not in self._annotations_cache:
 
3513
                # still waiting on at least one parent text, so queue it up
 
3514
                # Note that if there are multiple parents, we need to wait
 
3515
                # for all of them.
 
3516
                self._pending_annotation.setdefault(parent_key,
 
3517
                                                    []).append((key, parent_keys))
 
3518
                return False
 
3519
        return True
 
3520
 
 
3521
    def _extract_texts(self, records):
 
3522
        """Extract the various texts needed based on records"""
 
3523
        # We iterate in the order read, rather than a strict order requested
 
3524
        # However, process what we can, and put off to the side things that
 
3525
        # still need parents, cleaning them up when those parents are
 
3526
        # processed.
 
3527
        # Basic data flow:
 
3528
        #   1) As 'records' are read, see if we can expand these records into
 
3529
        #      Content objects (and thus lines)
 
3530
        #   2) If a given line-delta is waiting on its compression parent, it
 
3531
        #      gets queued up into self._pending_deltas, otherwise we expand
 
3532
        #      it, and put it into self._text_cache and self._content_objects
 
3533
        #   3) If we expanded the text, we will then check to see if all
 
3534
        #      parents have also been processed. If so, this text gets yielded,
 
3535
        #      else this record gets set aside into pending_annotation
 
3536
        #   4) Further, if we expanded the text in (2), we will then check to
 
3537
        #      see if there are any children in self._pending_deltas waiting to
 
3538
        #      also be processed. If so, we go back to (2) for those
 
3539
        #   5) Further again, if we yielded the text, we can then check if that
 
3540
        #      'unlocks' any of the texts in pending_annotations, which should
 
3541
        #      then get yielded as well
 
3542
        # Note that both steps 4 and 5 are 'recursive' in that unlocking one
 
3543
        # compression child could unlock yet another, and yielding a fulltext
 
3544
        # will also 'unlock' the children that are waiting on that annotation.
 
3545
        # (Though also, unlocking 1 parent's fulltext, does not unlock a child
 
3546
        # if other parents are also waiting.)
 
3547
        # We want to yield content before expanding child content objects, so
 
3548
        # that we know when we can re-use the content lines, and the annotation
 
3549
        # code can know when it can stop caching fulltexts, as well.
 
3550
 
 
3551
        # Children that are missing their compression parent
 
3552
        pending_deltas = {}
 
3553
        for (key, record, digest) in self._vf._read_records_iter(records):
 
3554
            # ghosts?
 
3555
            details = self._all_build_details[key]
 
3556
            (_, compression_parent, parent_keys, record_details) = details
 
3557
            lines = self._expand_record(key, parent_keys, compression_parent,
 
3558
                                        record, record_details)
 
3559
            if lines is None:
 
3560
                # Pending delta should be queued up
 
3561
                continue
 
3562
            # At this point, we may be able to yield this content, if all
 
3563
            # parents are also finished
 
3564
            yield_this_text = self._check_ready_for_annotations(key,
 
3565
                                                                parent_keys)
 
3566
            if yield_this_text:
 
3567
                # All parents present
 
3568
                yield key, lines, len(lines)
 
3569
            to_process = self._process_pending(key)
 
3570
            while to_process:
 
3571
                this_process = to_process
 
3572
                to_process = []
 
3573
                for key in this_process:
 
3574
                    lines = self._text_cache[key]
 
3575
                    yield key, lines, len(lines)
 
3576
                    to_process.extend(self._process_pending(key))
 
3577
 
 
3578
 
 
3579
try:
 
3580
    from ._knit_load_data_pyx import _load_data_c as _load_data
 
3581
except ImportError as e:
 
3582
    osutils.failed_to_load_extension(e)
 
3583
    from ._knit_load_data_py import _load_data_py as _load_data