/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-25 02:03:41 UTC
  • mfrom: (4187.3.6 remote-pack-hack)
  • Revision ID: pqm@pqm.ubuntu.com-20090325020341-dmq0yek061gtungf
(andrew) Buffer writes when pushing to a pack repository on a
        pre-1.12 smart server.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Knit versionedfile implementation.
 
18
 
 
19
A knit is a versioned file implementation that supports efficient append only
 
20
updates.
 
21
 
 
22
Knit file layout:
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
 
26
end-marker; simply "end VERSION"
 
27
 
 
28
delta can be line or full contents.a
 
29
... the 8's there are the index number of the annotation.
 
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
 
31
59,59,3
 
32
8
 
33
8         if ie.executable:
 
34
8             e.set('executable', 'yes')
 
35
130,130,2
 
36
8         if elt.get('executable') == 'yes':
 
37
8             ie.executable = True
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
 
39
 
 
40
 
 
41
whats in an index:
 
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
 
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
 
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
 
45
09:33 < lifeless> right
 
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
 
47
 
 
48
 
 
49
so the index sequence is the dictionary compressed sequence number used
 
50
in the deltas to provide line annotation
 
51
 
 
52
"""
 
53
 
 
54
 
 
55
from cStringIO import StringIO
 
56
from itertools import izip, chain
 
57
import operator
 
58
import os
 
59
import sys
 
60
 
 
61
from bzrlib.lazy_import import lazy_import
 
62
lazy_import(globals(), """
 
63
from bzrlib import (
 
64
    annotate,
 
65
    debug,
 
66
    diff,
 
67
    graph as _mod_graph,
 
68
    index as _mod_index,
 
69
    lru_cache,
 
70
    pack,
 
71
    progress,
 
72
    trace,
 
73
    tsort,
 
74
    tuned_gzip,
 
75
    )
 
76
""")
 
77
from bzrlib import (
 
78
    errors,
 
79
    osutils,
 
80
    patiencediff,
 
81
    )
 
82
from bzrlib.errors import (
 
83
    FileExists,
 
84
    NoSuchFile,
 
85
    KnitError,
 
86
    InvalidRevisionId,
 
87
    KnitCorrupt,
 
88
    KnitHeaderError,
 
89
    RevisionNotPresent,
 
90
    RevisionAlreadyPresent,
 
91
    SHA1KnitCorrupt,
 
92
    )
 
93
from bzrlib.osutils import (
 
94
    contains_whitespace,
 
95
    contains_linebreaks,
 
96
    sha_string,
 
97
    sha_strings,
 
98
    split_lines,
 
99
    )
 
100
from bzrlib.versionedfile import (
 
101
    AbsentContentFactory,
 
102
    adapter_registry,
 
103
    ConstantMapper,
 
104
    ContentFactory,
 
105
    ChunkedContentFactory,
 
106
    sort_groupcompress,
 
107
    VersionedFile,
 
108
    VersionedFiles,
 
109
    )
 
110
 
 
111
 
 
112
# TODO: Split out code specific to this format into an associated object.
 
113
 
 
114
# TODO: Can we put in some kind of value to check that the index and data
 
115
# files belong together?
 
116
 
 
117
# TODO: accommodate binaries, perhaps by storing a byte count
 
118
 
 
119
# TODO: function to check whole file
 
120
 
 
121
# TODO: atomically append data, then measure backwards from the cursor
 
122
# position after writing to work out where it was located.  we may need to
 
123
# bypass python file buffering.
 
124
 
 
125
DATA_SUFFIX = '.knit'
 
126
INDEX_SUFFIX = '.kndx'
 
127
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
 
128
 
 
129
 
 
130
class KnitAdapter(object):
 
131
    """Base class for knit record adaption."""
 
132
 
 
133
    def __init__(self, basis_vf):
 
134
        """Create an adapter which accesses full texts from basis_vf.
 
135
 
 
136
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
137
            May be None for adapters that do not need to access basis texts.
 
138
        """
 
139
        self._data = KnitVersionedFiles(None, None)
 
140
        self._annotate_factory = KnitAnnotateFactory()
 
141
        self._plain_factory = KnitPlainFactory()
 
142
        self._basis_vf = basis_vf
 
143
 
 
144
 
 
145
class FTAnnotatedToUnannotated(KnitAdapter):
 
146
    """An adapter from FT annotated knits to unannotated ones."""
 
147
 
 
148
    def get_bytes(self, factory):
 
149
        annotated_compressed_bytes = factory._raw_record
 
150
        rec, contents = \
 
151
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
152
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
153
        size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
 
154
        return bytes
 
155
 
 
156
 
 
157
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
158
    """An adapter for deltas from annotated to unannotated."""
 
159
 
 
160
    def get_bytes(self, factory):
 
161
        annotated_compressed_bytes = factory._raw_record
 
162
        rec, contents = \
 
163
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
164
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
165
            plain=True)
 
166
        contents = self._plain_factory.lower_line_delta(delta)
 
167
        size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
 
168
        return bytes
 
169
 
 
170
 
 
171
class FTAnnotatedToFullText(KnitAdapter):
 
172
    """An adapter from FT annotated knits to unannotated ones."""
 
173
 
 
174
    def get_bytes(self, factory):
 
175
        annotated_compressed_bytes = factory._raw_record
 
176
        rec, contents = \
 
177
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
178
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
 
179
            contents, factory._build_details, None)
 
180
        return ''.join(content.text())
 
181
 
 
182
 
 
183
class DeltaAnnotatedToFullText(KnitAdapter):
 
184
    """An adapter for deltas from annotated to unannotated."""
 
185
 
 
186
    def get_bytes(self, factory):
 
187
        annotated_compressed_bytes = factory._raw_record
 
188
        rec, contents = \
 
189
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
190
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
191
            plain=True)
 
192
        compression_parent = factory.parents[0]
 
193
        basis_entry = self._basis_vf.get_record_stream(
 
194
            [compression_parent], 'unordered', True).next()
 
195
        if basis_entry.storage_kind == 'absent':
 
196
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
197
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
198
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
199
        # Manually apply the delta because we have one annotated content and
 
200
        # one plain.
 
201
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
202
        basis_content.apply_delta(delta, rec[1])
 
203
        basis_content._should_strip_eol = factory._build_details[1]
 
204
        return ''.join(basis_content.text())
 
205
 
 
206
 
 
207
class FTPlainToFullText(KnitAdapter):
 
208
    """An adapter from FT plain knits to unannotated ones."""
 
209
 
 
210
    def get_bytes(self, factory):
 
211
        compressed_bytes = factory._raw_record
 
212
        rec, contents = \
 
213
            self._data._parse_record_unchecked(compressed_bytes)
 
214
        content, delta = self._plain_factory.parse_record(factory.key[-1],
 
215
            contents, factory._build_details, None)
 
216
        return ''.join(content.text())
 
217
 
 
218
 
 
219
class DeltaPlainToFullText(KnitAdapter):
 
220
    """An adapter for deltas from annotated to unannotated."""
 
221
 
 
222
    def get_bytes(self, factory):
 
223
        compressed_bytes = factory._raw_record
 
224
        rec, contents = \
 
225
            self._data._parse_record_unchecked(compressed_bytes)
 
226
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
227
        compression_parent = factory.parents[0]
 
228
        # XXX: string splitting overhead.
 
229
        basis_entry = self._basis_vf.get_record_stream(
 
230
            [compression_parent], 'unordered', True).next()
 
231
        if basis_entry.storage_kind == 'absent':
 
232
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
233
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
234
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
235
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
236
        # Manually apply the delta because we have one annotated content and
 
237
        # one plain.
 
238
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
239
            factory._build_details, basis_content)
 
240
        return ''.join(content.text())
 
241
 
 
242
 
 
243
class KnitContentFactory(ContentFactory):
 
244
    """Content factory for streaming from knits.
 
245
 
 
246
    :seealso ContentFactory:
 
247
    """
 
248
 
 
249
    def __init__(self, key, parents, build_details, sha1, raw_record,
 
250
        annotated, knit=None, network_bytes=None):
 
251
        """Create a KnitContentFactory for key.
 
252
 
 
253
        :param key: The key.
 
254
        :param parents: The parents.
 
255
        :param build_details: The build details as returned from
 
256
            get_build_details.
 
257
        :param sha1: The sha1 expected from the full text of this object.
 
258
        :param raw_record: The bytes of the knit data from disk.
 
259
        :param annotated: True if the raw data is annotated.
 
260
        :param network_bytes: None to calculate the network bytes on demand,
 
261
            not-none if they are already known.
 
262
        """
 
263
        ContentFactory.__init__(self)
 
264
        self.sha1 = sha1
 
265
        self.key = key
 
266
        self.parents = parents
 
267
        if build_details[0] == 'line-delta':
 
268
            kind = 'delta'
 
269
        else:
 
270
            kind = 'ft'
 
271
        if annotated:
 
272
            annotated_kind = 'annotated-'
 
273
        else:
 
274
            annotated_kind = ''
 
275
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
276
        self._raw_record = raw_record
 
277
        self._network_bytes = network_bytes
 
278
        self._build_details = build_details
 
279
        self._knit = knit
 
280
 
 
281
    def _create_network_bytes(self):
 
282
        """Create a fully serialised network version for transmission."""
 
283
        # storage_kind, key, parents, Noeol, raw_record
 
284
        key_bytes = '\x00'.join(self.key)
 
285
        if self.parents is None:
 
286
            parent_bytes = 'None:'
 
287
        else:
 
288
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
289
        if self._build_details[1]:
 
290
            noeol = 'N'
 
291
        else:
 
292
            noeol = ' '
 
293
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
294
            parent_bytes, noeol, self._raw_record)
 
295
        self._network_bytes = network_bytes
 
296
 
 
297
    def get_bytes_as(self, storage_kind):
 
298
        if storage_kind == self.storage_kind:
 
299
            if self._network_bytes is None:
 
300
                self._create_network_bytes()
 
301
            return self._network_bytes
 
302
        if ('-ft-' in self.storage_kind and
 
303
            storage_kind in ('chunked', 'fulltext')):
 
304
            adapter_key = (self.storage_kind, 'fulltext')
 
305
            adapter_factory = adapter_registry.get(adapter_key)
 
306
            adapter = adapter_factory(None)
 
307
            bytes = adapter.get_bytes(self)
 
308
            if storage_kind == 'chunked':
 
309
                return [bytes]
 
310
            else:
 
311
                return bytes
 
312
        if self._knit is not None:
 
313
            # Not redundant with direct conversion above - that only handles
 
314
            # fulltext cases.
 
315
            if storage_kind == 'chunked':
 
316
                return self._knit.get_lines(self.key[0])
 
317
            elif storage_kind == 'fulltext':
 
318
                return self._knit.get_text(self.key[0])
 
319
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
320
            self.storage_kind)
 
321
 
 
322
 
 
323
class LazyKnitContentFactory(ContentFactory):
 
324
    """A ContentFactory which can either generate full text or a wire form.
 
325
 
 
326
    :seealso ContentFactory:
 
327
    """
 
328
 
 
329
    def __init__(self, key, parents, generator, first):
 
330
        """Create a LazyKnitContentFactory.
 
331
 
 
332
        :param key: The key of the record.
 
333
        :param parents: The parents of the record.
 
334
        :param generator: A _ContentMapGenerator containing the record for this
 
335
            key.
 
336
        :param first: Is this the first content object returned from generator?
 
337
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
338
            knit-delta-closure-ref
 
339
        """
 
340
        self.key = key
 
341
        self.parents = parents
 
342
        self.sha1 = None
 
343
        self._generator = generator
 
344
        self.storage_kind = "knit-delta-closure"
 
345
        if not first:
 
346
            self.storage_kind = self.storage_kind + "-ref"
 
347
        self._first = first
 
348
 
 
349
    def get_bytes_as(self, storage_kind):
 
350
        if storage_kind == self.storage_kind:
 
351
            if self._first:
 
352
                return self._generator._wire_bytes()
 
353
            else:
 
354
                # all the keys etc are contained in the bytes returned in the
 
355
                # first record.
 
356
                return ''
 
357
        if storage_kind in ('chunked', 'fulltext'):
 
358
            chunks = self._generator._get_one_work(self.key).text()
 
359
            if storage_kind == 'chunked':
 
360
                return chunks
 
361
            else:
 
362
                return ''.join(chunks)
 
363
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
364
            self.storage_kind)
 
365
 
 
366
 
 
367
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
368
    """Convert a network record to a iterator over stream records.
 
369
 
 
370
    :param storage_kind: The storage kind of the record.
 
371
        Must be 'knit-delta-closure'.
 
372
    :param bytes: The bytes of the record on the network.
 
373
    """
 
374
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
375
    return generator.get_record_stream()
 
376
 
 
377
 
 
378
def knit_network_to_record(storage_kind, bytes, line_end):
 
379
    """Convert a network record to a record object.
 
380
 
 
381
    :param storage_kind: The storage kind of the record.
 
382
    :param bytes: The bytes of the record on the network.
 
383
    """
 
384
    start = line_end
 
385
    line_end = bytes.find('\n', start)
 
386
    key = tuple(bytes[start:line_end].split('\x00'))
 
387
    start = line_end + 1
 
388
    line_end = bytes.find('\n', start)
 
389
    parent_line = bytes[start:line_end]
 
390
    if parent_line == 'None:':
 
391
        parents = None
 
392
    else:
 
393
        parents = tuple(
 
394
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
395
             if segment])
 
396
    start = line_end + 1
 
397
    noeol = bytes[start] == 'N'
 
398
    if 'ft' in storage_kind:
 
399
        method = 'fulltext'
 
400
    else:
 
401
        method = 'line-delta'
 
402
    build_details = (method, noeol)
 
403
    start = start + 1
 
404
    raw_record = bytes[start:]
 
405
    annotated = 'annotated' in storage_kind
 
406
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
407
        annotated, network_bytes=bytes)]
 
408
 
 
409
 
 
410
class KnitContent(object):
 
411
    """Content of a knit version to which deltas can be applied.
 
412
 
 
413
    This is always stored in memory as a list of lines with \n at the end,
 
414
    plus a flag saying if the final ending is really there or not, because that
 
415
    corresponds to the on-disk knit representation.
 
416
    """
 
417
 
 
418
    def __init__(self):
 
419
        self._should_strip_eol = False
 
420
 
 
421
    def apply_delta(self, delta, new_version_id):
 
422
        """Apply delta to this object to become new_version_id."""
 
423
        raise NotImplementedError(self.apply_delta)
 
424
 
 
425
    def line_delta_iter(self, new_lines):
 
426
        """Generate line-based delta from this content to new_lines."""
 
427
        new_texts = new_lines.text()
 
428
        old_texts = self.text()
 
429
        s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
 
430
        for tag, i1, i2, j1, j2 in s.get_opcodes():
 
431
            if tag == 'equal':
 
432
                continue
 
433
            # ofrom, oto, length, data
 
434
            yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
 
435
 
 
436
    def line_delta(self, new_lines):
 
437
        return list(self.line_delta_iter(new_lines))
 
438
 
 
439
    @staticmethod
 
440
    def get_line_delta_blocks(knit_delta, source, target):
 
441
        """Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
 
442
        target_len = len(target)
 
443
        s_pos = 0
 
444
        t_pos = 0
 
445
        for s_begin, s_end, t_len, new_text in knit_delta:
 
446
            true_n = s_begin - s_pos
 
447
            n = true_n
 
448
            if n > 0:
 
449
                # knit deltas do not provide reliable info about whether the
 
450
                # last line of a file matches, due to eol handling.
 
451
                if source[s_pos + n -1] != target[t_pos + n -1]:
 
452
                    n-=1
 
453
                if n > 0:
 
454
                    yield s_pos, t_pos, n
 
455
            t_pos += t_len + true_n
 
456
            s_pos = s_end
 
457
        n = target_len - t_pos
 
458
        if n > 0:
 
459
            if source[s_pos + n -1] != target[t_pos + n -1]:
 
460
                n-=1
 
461
            if n > 0:
 
462
                yield s_pos, t_pos, n
 
463
        yield s_pos + (target_len - t_pos), target_len, 0
 
464
 
 
465
 
 
466
class AnnotatedKnitContent(KnitContent):
 
467
    """Annotated content."""
 
468
 
 
469
    def __init__(self, lines):
 
470
        KnitContent.__init__(self)
 
471
        self._lines = lines
 
472
 
 
473
    def annotate(self):
 
474
        """Return a list of (origin, text) for each content line."""
 
475
        lines = self._lines[:]
 
476
        if self._should_strip_eol:
 
477
            origin, last_line = lines[-1]
 
478
            lines[-1] = (origin, last_line.rstrip('\n'))
 
479
        return lines
 
480
 
 
481
    def apply_delta(self, delta, new_version_id):
 
482
        """Apply delta to this object to become new_version_id."""
 
483
        offset = 0
 
484
        lines = self._lines
 
485
        for start, end, count, delta_lines in delta:
 
486
            lines[offset+start:offset+end] = delta_lines
 
487
            offset = offset + (start - end) + count
 
488
 
 
489
    def text(self):
 
490
        try:
 
491
            lines = [text for origin, text in self._lines]
 
492
        except ValueError, e:
 
493
            # most commonly (only?) caused by the internal form of the knit
 
494
            # missing annotation information because of a bug - see thread
 
495
            # around 20071015
 
496
            raise KnitCorrupt(self,
 
497
                "line in annotated knit missing annotation information: %s"
 
498
                % (e,))
 
499
        if self._should_strip_eol:
 
500
            lines[-1] = lines[-1].rstrip('\n')
 
501
        return lines
 
502
 
 
503
    def copy(self):
 
504
        return AnnotatedKnitContent(self._lines[:])
 
505
 
 
506
 
 
507
class PlainKnitContent(KnitContent):
 
508
    """Unannotated content.
 
509
 
 
510
    When annotate[_iter] is called on this content, the same version is reported
 
511
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
 
512
    objects.
 
513
    """
 
514
 
 
515
    def __init__(self, lines, version_id):
 
516
        KnitContent.__init__(self)
 
517
        self._lines = lines
 
518
        self._version_id = version_id
 
519
 
 
520
    def annotate(self):
 
521
        """Return a list of (origin, text) for each content line."""
 
522
        return [(self._version_id, line) for line in self._lines]
 
523
 
 
524
    def apply_delta(self, delta, new_version_id):
 
525
        """Apply delta to this object to become new_version_id."""
 
526
        offset = 0
 
527
        lines = self._lines
 
528
        for start, end, count, delta_lines in delta:
 
529
            lines[offset+start:offset+end] = delta_lines
 
530
            offset = offset + (start - end) + count
 
531
        self._version_id = new_version_id
 
532
 
 
533
    def copy(self):
 
534
        return PlainKnitContent(self._lines[:], self._version_id)
 
535
 
 
536
    def text(self):
 
537
        lines = self._lines
 
538
        if self._should_strip_eol:
 
539
            lines = lines[:]
 
540
            lines[-1] = lines[-1].rstrip('\n')
 
541
        return lines
 
542
 
 
543
 
 
544
class _KnitFactory(object):
 
545
    """Base class for common Factory functions."""
 
546
 
 
547
    def parse_record(self, version_id, record, record_details,
 
548
                     base_content, copy_base_content=True):
 
549
        """Parse a record into a full content object.
 
550
 
 
551
        :param version_id: The official version id for this content
 
552
        :param record: The data returned by read_records_iter()
 
553
        :param record_details: Details about the record returned by
 
554
            get_build_details
 
555
        :param base_content: If get_build_details returns a compression_parent,
 
556
            you must return a base_content here, else use None
 
557
        :param copy_base_content: When building from the base_content, decide
 
558
            you can either copy it and return a new object, or modify it in
 
559
            place.
 
560
        :return: (content, delta) A Content object and possibly a line-delta,
 
561
            delta may be None
 
562
        """
 
563
        method, noeol = record_details
 
564
        if method == 'line-delta':
 
565
            if copy_base_content:
 
566
                content = base_content.copy()
 
567
            else:
 
568
                content = base_content
 
569
            delta = self.parse_line_delta(record, version_id)
 
570
            content.apply_delta(delta, version_id)
 
571
        else:
 
572
            content = self.parse_fulltext(record, version_id)
 
573
            delta = None
 
574
        content._should_strip_eol = noeol
 
575
        return (content, delta)
 
576
 
 
577
 
 
578
class KnitAnnotateFactory(_KnitFactory):
 
579
    """Factory for creating annotated Content objects."""
 
580
 
 
581
    annotated = True
 
582
 
 
583
    def make(self, lines, version_id):
 
584
        num_lines = len(lines)
 
585
        return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
 
586
 
 
587
    def parse_fulltext(self, content, version_id):
 
588
        """Convert fulltext to internal representation
 
589
 
 
590
        fulltext content is of the format
 
591
        revid(utf8) plaintext\n
 
592
        internal representation is of the format:
 
593
        (revid, plaintext)
 
594
        """
 
595
        # TODO: jam 20070209 The tests expect this to be returned as tuples,
 
596
        #       but the code itself doesn't really depend on that.
 
597
        #       Figure out a way to not require the overhead of turning the
 
598
        #       list back into tuples.
 
599
        lines = [tuple(line.split(' ', 1)) for line in content]
 
600
        return AnnotatedKnitContent(lines)
 
601
 
 
602
    def parse_line_delta_iter(self, lines):
 
603
        return iter(self.parse_line_delta(lines))
 
604
 
 
605
    def parse_line_delta(self, lines, version_id, plain=False):
 
606
        """Convert a line based delta into internal representation.
 
607
 
 
608
        line delta is in the form of:
 
609
        intstart intend intcount
 
610
        1..count lines:
 
611
        revid(utf8) newline\n
 
612
        internal representation is
 
613
        (start, end, count, [1..count tuples (revid, newline)])
 
614
 
 
615
        :param plain: If True, the lines are returned as a plain
 
616
            list without annotations, not as a list of (origin, content) tuples, i.e.
 
617
            (start, end, count, [1..count newline])
 
618
        """
 
619
        result = []
 
620
        lines = iter(lines)
 
621
        next = lines.next
 
622
 
 
623
        cache = {}
 
624
        def cache_and_return(line):
 
625
            origin, text = line.split(' ', 1)
 
626
            return cache.setdefault(origin, origin), text
 
627
 
 
628
        # walk through the lines parsing.
 
629
        # Note that the plain test is explicitly pulled out of the
 
630
        # loop to minimise any performance impact
 
631
        if plain:
 
632
            for header in lines:
 
633
                start, end, count = [int(n) for n in header.split(',')]
 
634
                contents = [next().split(' ', 1)[1] for i in xrange(count)]
 
635
                result.append((start, end, count, contents))
 
636
        else:
 
637
            for header in lines:
 
638
                start, end, count = [int(n) for n in header.split(',')]
 
639
                contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
 
640
                result.append((start, end, count, contents))
 
641
        return result
 
642
 
 
643
    def get_fulltext_content(self, lines):
 
644
        """Extract just the content lines from a fulltext."""
 
645
        return (line.split(' ', 1)[1] for line in lines)
 
646
 
 
647
    def get_linedelta_content(self, lines):
 
648
        """Extract just the content from a line delta.
 
649
 
 
650
        This doesn't return all of the extra information stored in a delta.
 
651
        Only the actual content lines.
 
652
        """
 
653
        lines = iter(lines)
 
654
        next = lines.next
 
655
        for header in lines:
 
656
            header = header.split(',')
 
657
            count = int(header[2])
 
658
            for i in xrange(count):
 
659
                origin, text = next().split(' ', 1)
 
660
                yield text
 
661
 
 
662
    def lower_fulltext(self, content):
 
663
        """convert a fulltext content record into a serializable form.
 
664
 
 
665
        see parse_fulltext which this inverts.
 
666
        """
 
667
        # TODO: jam 20070209 We only do the caching thing to make sure that
 
668
        #       the origin is a valid utf-8 line, eventually we could remove it
 
669
        return ['%s %s' % (o, t) for o, t in content._lines]
 
670
 
 
671
    def lower_line_delta(self, delta):
 
672
        """convert a delta into a serializable form.
 
673
 
 
674
        See parse_line_delta which this inverts.
 
675
        """
 
676
        # TODO: jam 20070209 We only do the caching thing to make sure that
 
677
        #       the origin is a valid utf-8 line, eventually we could remove it
 
678
        out = []
 
679
        for start, end, c, lines in delta:
 
680
            out.append('%d,%d,%d\n' % (start, end, c))
 
681
            out.extend(origin + ' ' + text
 
682
                       for origin, text in lines)
 
683
        return out
 
684
 
 
685
    def annotate(self, knit, key):
 
686
        content = knit._get_content(key)
 
687
        # adjust for the fact that serialised annotations are only key suffixes
 
688
        # for this factory.
 
689
        if type(key) == tuple:
 
690
            prefix = key[:-1]
 
691
            origins = content.annotate()
 
692
            result = []
 
693
            for origin, line in origins:
 
694
                result.append((prefix + (origin,), line))
 
695
            return result
 
696
        else:
 
697
            # XXX: This smells a bit.  Why would key ever be a non-tuple here?
 
698
            # Aren't keys defined to be tuples?  -- spiv 20080618
 
699
            return content.annotate()
 
700
 
 
701
 
 
702
class KnitPlainFactory(_KnitFactory):
 
703
    """Factory for creating plain Content objects."""
 
704
 
 
705
    annotated = False
 
706
 
 
707
    def make(self, lines, version_id):
 
708
        return PlainKnitContent(lines, version_id)
 
709
 
 
710
    def parse_fulltext(self, content, version_id):
 
711
        """This parses an unannotated fulltext.
 
712
 
 
713
        Note that this is not a noop - the internal representation
 
714
        has (versionid, line) - its just a constant versionid.
 
715
        """
 
716
        return self.make(content, version_id)
 
717
 
 
718
    def parse_line_delta_iter(self, lines, version_id):
 
719
        cur = 0
 
720
        num_lines = len(lines)
 
721
        while cur < num_lines:
 
722
            header = lines[cur]
 
723
            cur += 1
 
724
            start, end, c = [int(n) for n in header.split(',')]
 
725
            yield start, end, c, lines[cur:cur+c]
 
726
            cur += c
 
727
 
 
728
    def parse_line_delta(self, lines, version_id):
 
729
        return list(self.parse_line_delta_iter(lines, version_id))
 
730
 
 
731
    def get_fulltext_content(self, lines):
 
732
        """Extract just the content lines from a fulltext."""
 
733
        return iter(lines)
 
734
 
 
735
    def get_linedelta_content(self, lines):
 
736
        """Extract just the content from a line delta.
 
737
 
 
738
        This doesn't return all of the extra information stored in a delta.
 
739
        Only the actual content lines.
 
740
        """
 
741
        lines = iter(lines)
 
742
        next = lines.next
 
743
        for header in lines:
 
744
            header = header.split(',')
 
745
            count = int(header[2])
 
746
            for i in xrange(count):
 
747
                yield next()
 
748
 
 
749
    def lower_fulltext(self, content):
 
750
        return content.text()
 
751
 
 
752
    def lower_line_delta(self, delta):
 
753
        out = []
 
754
        for start, end, c, lines in delta:
 
755
            out.append('%d,%d,%d\n' % (start, end, c))
 
756
            out.extend(lines)
 
757
        return out
 
758
 
 
759
    def annotate(self, knit, key):
 
760
        annotator = _KnitAnnotator(knit)
 
761
        return annotator.annotate(key)
 
762
 
 
763
 
 
764
 
 
765
def make_file_factory(annotated, mapper):
 
766
    """Create a factory for creating a file based KnitVersionedFiles.
 
767
 
 
768
    This is only functional enough to run interface tests, it doesn't try to
 
769
    provide a full pack environment.
 
770
 
 
771
    :param annotated: knit annotations are wanted.
 
772
    :param mapper: The mapper from keys to paths.
 
773
    """
 
774
    def factory(transport):
 
775
        index = _KndxIndex(transport, mapper, lambda:None, lambda:True, lambda:True)
 
776
        access = _KnitKeyAccess(transport, mapper)
 
777
        return KnitVersionedFiles(index, access, annotated=annotated)
 
778
    return factory
 
779
 
 
780
 
 
781
def make_pack_factory(graph, delta, keylength):
 
782
    """Create a factory for creating a pack based VersionedFiles.
 
783
 
 
784
    This is only functional enough to run interface tests, it doesn't try to
 
785
    provide a full pack environment.
 
786
 
 
787
    :param graph: Store a graph.
 
788
    :param delta: Delta compress contents.
 
789
    :param keylength: How long should keys be.
 
790
    """
 
791
    def factory(transport):
 
792
        parents = graph or delta
 
793
        ref_length = 0
 
794
        if graph:
 
795
            ref_length += 1
 
796
        if delta:
 
797
            ref_length += 1
 
798
            max_delta_chain = 200
 
799
        else:
 
800
            max_delta_chain = 0
 
801
        graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
 
802
            key_elements=keylength)
 
803
        stream = transport.open_write_stream('newpack')
 
804
        writer = pack.ContainerWriter(stream.write)
 
805
        writer.begin()
 
806
        index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
 
807
            deltas=delta, add_callback=graph_index.add_nodes)
 
808
        access = _DirectPackAccess({})
 
809
        access.set_writer(writer, graph_index, (transport, 'newpack'))
 
810
        result = KnitVersionedFiles(index, access,
 
811
            max_delta_chain=max_delta_chain)
 
812
        result.stream = stream
 
813
        result.writer = writer
 
814
        return result
 
815
    return factory
 
816
 
 
817
 
 
818
def cleanup_pack_knit(versioned_files):
 
819
    versioned_files.stream.close()
 
820
    versioned_files.writer.end()
 
821
 
 
822
 
 
823
def _get_total_build_size(self, keys, positions):
 
824
    """Determine the total bytes to build these keys.
 
825
 
 
826
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
827
    don't inherit from a common base.)
 
828
 
 
829
    :param keys: Keys that we want to build
 
830
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
831
        as returned by _get_components_positions)
 
832
    :return: Number of bytes to build those keys
 
833
    """
 
834
    all_build_index_memos = {}
 
835
    build_keys = keys
 
836
    while build_keys:
 
837
        next_keys = set()
 
838
        for key in build_keys:
 
839
            # This is mostly for the 'stacked' case
 
840
            # Where we will be getting the data from a fallback
 
841
            if key not in positions:
 
842
                continue
 
843
            _, index_memo, compression_parent = positions[key]
 
844
            all_build_index_memos[key] = index_memo
 
845
            if compression_parent not in all_build_index_memos:
 
846
                next_keys.add(compression_parent)
 
847
        build_keys = next_keys
 
848
    return sum([index_memo[2] for index_memo
 
849
                in all_build_index_memos.itervalues()])
 
850
 
 
851
 
 
852
class KnitVersionedFiles(VersionedFiles):
 
853
    """Storage for many versioned files using knit compression.
 
854
 
 
855
    Backend storage is managed by indices and data objects.
 
856
 
 
857
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
858
        parents, graph, compression and data location of entries in this
 
859
        KnitVersionedFiles.  Note that this is only the index for
 
860
        *this* vfs; if there are fallbacks they must be queried separately.
 
861
    """
 
862
 
 
863
    def __init__(self, index, data_access, max_delta_chain=200,
 
864
                 annotated=False, reload_func=None):
 
865
        """Create a KnitVersionedFiles with index and data_access.
 
866
 
 
867
        :param index: The index for the knit data.
 
868
        :param data_access: The access object to store and retrieve knit
 
869
            records.
 
870
        :param max_delta_chain: The maximum number of deltas to permit during
 
871
            insertion. Set to 0 to prohibit the use of deltas.
 
872
        :param annotated: Set to True to cause annotations to be calculated and
 
873
            stored during insertion.
 
874
        :param reload_func: An function that can be called if we think we need
 
875
            to reload the pack listing and try again. See
 
876
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
 
877
        """
 
878
        self._index = index
 
879
        self._access = data_access
 
880
        self._max_delta_chain = max_delta_chain
 
881
        if annotated:
 
882
            self._factory = KnitAnnotateFactory()
 
883
        else:
 
884
            self._factory = KnitPlainFactory()
 
885
        self._fallback_vfs = []
 
886
        self._reload_func = reload_func
 
887
 
 
888
    def __repr__(self):
 
889
        return "%s(%r, %r)" % (
 
890
            self.__class__.__name__,
 
891
            self._index,
 
892
            self._access)
 
893
 
 
894
    def add_fallback_versioned_files(self, a_versioned_files):
 
895
        """Add a source of texts for texts not present in this knit.
 
896
 
 
897
        :param a_versioned_files: A VersionedFiles object.
 
898
        """
 
899
        self._fallback_vfs.append(a_versioned_files)
 
900
 
 
901
    def add_lines(self, key, parents, lines, parent_texts=None,
 
902
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
903
        check_content=True):
 
904
        """See VersionedFiles.add_lines()."""
 
905
        self._index._check_write_ok()
 
906
        self._check_add(key, lines, random_id, check_content)
 
907
        if parents is None:
 
908
            # The caller might pass None if there is no graph data, but kndx
 
909
            # indexes can't directly store that, so we give them
 
910
            # an empty tuple instead.
 
911
            parents = ()
 
912
        return self._add(key, lines, parents,
 
913
            parent_texts, left_matching_blocks, nostore_sha, random_id)
 
914
 
 
915
    def _add(self, key, lines, parents, parent_texts,
 
916
        left_matching_blocks, nostore_sha, random_id):
 
917
        """Add a set of lines on top of version specified by parents.
 
918
 
 
919
        Any versions not present will be converted into ghosts.
 
920
        """
 
921
        # first thing, if the content is something we don't need to store, find
 
922
        # that out.
 
923
        line_bytes = ''.join(lines)
 
924
        digest = sha_string(line_bytes)
 
925
        if nostore_sha == digest:
 
926
            raise errors.ExistingContent
 
927
 
 
928
        present_parents = []
 
929
        if parent_texts is None:
 
930
            parent_texts = {}
 
931
        # Do a single query to ascertain parent presence; we only compress
 
932
        # against parents in the same kvf.
 
933
        present_parent_map = self._index.get_parent_map(parents)
 
934
        for parent in parents:
 
935
            if parent in present_parent_map:
 
936
                present_parents.append(parent)
 
937
 
 
938
        # Currently we can only compress against the left most present parent.
 
939
        if (len(present_parents) == 0 or
 
940
            present_parents[0] != parents[0]):
 
941
            delta = False
 
942
        else:
 
943
            # To speed the extract of texts the delta chain is limited
 
944
            # to a fixed number of deltas.  This should minimize both
 
945
            # I/O and the time spend applying deltas.
 
946
            delta = self._check_should_delta(present_parents[0])
 
947
 
 
948
        text_length = len(line_bytes)
 
949
        options = []
 
950
        if lines:
 
951
            if lines[-1][-1] != '\n':
 
952
                # copy the contents of lines.
 
953
                lines = lines[:]
 
954
                options.append('no-eol')
 
955
                lines[-1] = lines[-1] + '\n'
 
956
                line_bytes += '\n'
 
957
 
 
958
        for element in key:
 
959
            if type(element) != str:
 
960
                raise TypeError("key contains non-strings: %r" % (key,))
 
961
        # Knit hunks are still last-element only
 
962
        version_id = key[-1]
 
963
        content = self._factory.make(lines, version_id)
 
964
        if 'no-eol' in options:
 
965
            # Hint to the content object that its text() call should strip the
 
966
            # EOL.
 
967
            content._should_strip_eol = True
 
968
        if delta or (self._factory.annotated and len(present_parents) > 0):
 
969
            # Merge annotations from parent texts if needed.
 
970
            delta_hunks = self._merge_annotations(content, present_parents,
 
971
                parent_texts, delta, self._factory.annotated,
 
972
                left_matching_blocks)
 
973
 
 
974
        if delta:
 
975
            options.append('line-delta')
 
976
            store_lines = self._factory.lower_line_delta(delta_hunks)
 
977
            size, bytes = self._record_to_data(key, digest,
 
978
                store_lines)
 
979
        else:
 
980
            options.append('fulltext')
 
981
            # isinstance is slower and we have no hierarchy.
 
982
            if self._factory.__class__ is KnitPlainFactory:
 
983
                # Use the already joined bytes saving iteration time in
 
984
                # _record_to_data.
 
985
                size, bytes = self._record_to_data(key, digest,
 
986
                    lines, [line_bytes])
 
987
            else:
 
988
                # get mixed annotation + content and feed it into the
 
989
                # serialiser.
 
990
                store_lines = self._factory.lower_fulltext(content)
 
991
                size, bytes = self._record_to_data(key, digest,
 
992
                    store_lines)
 
993
 
 
994
        access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
 
995
        self._index.add_records(
 
996
            ((key, options, access_memo, parents),),
 
997
            random_id=random_id)
 
998
        return digest, text_length, content
 
999
 
 
1000
    def annotate(self, key):
 
1001
        """See VersionedFiles.annotate."""
 
1002
        return self._factory.annotate(self, key)
 
1003
 
 
1004
    def check(self, progress_bar=None):
 
1005
        """See VersionedFiles.check()."""
 
1006
        # This doesn't actually test extraction of everything, but that will
 
1007
        # impact 'bzr check' substantially, and needs to be integrated with
 
1008
        # care. However, it does check for the obvious problem of a delta with
 
1009
        # no basis.
 
1010
        keys = self._index.keys()
 
1011
        parent_map = self.get_parent_map(keys)
 
1012
        for key in keys:
 
1013
            if self._index.get_method(key) != 'fulltext':
 
1014
                compression_parent = parent_map[key][0]
 
1015
                if compression_parent not in parent_map:
 
1016
                    raise errors.KnitCorrupt(self,
 
1017
                        "Missing basis parent %s for %s" % (
 
1018
                        compression_parent, key))
 
1019
        for fallback_vfs in self._fallback_vfs:
 
1020
            fallback_vfs.check()
 
1021
 
 
1022
    def _check_add(self, key, lines, random_id, check_content):
 
1023
        """check that version_id and lines are safe to add."""
 
1024
        version_id = key[-1]
 
1025
        if contains_whitespace(version_id):
 
1026
            raise InvalidRevisionId(version_id, self)
 
1027
        self.check_not_reserved_id(version_id)
 
1028
        # TODO: If random_id==False and the key is already present, we should
 
1029
        # probably check that the existing content is identical to what is
 
1030
        # being inserted, and otherwise raise an exception.  This would make
 
1031
        # the bundle code simpler.
 
1032
        if check_content:
 
1033
            self._check_lines_not_unicode(lines)
 
1034
            self._check_lines_are_lines(lines)
 
1035
 
 
1036
    def _check_header(self, key, line):
 
1037
        rec = self._split_header(line)
 
1038
        self._check_header_version(rec, key[-1])
 
1039
        return rec
 
1040
 
 
1041
    def _check_header_version(self, rec, version_id):
 
1042
        """Checks the header version on original format knit records.
 
1043
 
 
1044
        These have the last component of the key embedded in the record.
 
1045
        """
 
1046
        if rec[1] != version_id:
 
1047
            raise KnitCorrupt(self,
 
1048
                'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
 
1049
 
 
1050
    def _check_should_delta(self, parent):
 
1051
        """Iterate back through the parent listing, looking for a fulltext.
 
1052
 
 
1053
        This is used when we want to decide whether to add a delta or a new
 
1054
        fulltext. It searches for _max_delta_chain parents. When it finds a
 
1055
        fulltext parent, it sees if the total size of the deltas leading up to
 
1056
        it is large enough to indicate that we want a new full text anyway.
 
1057
 
 
1058
        Return True if we should create a new delta, False if we should use a
 
1059
        full text.
 
1060
        """
 
1061
        delta_size = 0
 
1062
        fulltext_size = None
 
1063
        for count in xrange(self._max_delta_chain):
 
1064
            try:
 
1065
                # Note that this only looks in the index of this particular
 
1066
                # KnitVersionedFiles, not in the fallbacks.  This ensures that
 
1067
                # we won't store a delta spanning physical repository
 
1068
                # boundaries.
 
1069
                build_details = self._index.get_build_details([parent])
 
1070
                parent_details = build_details[parent]
 
1071
            except (RevisionNotPresent, KeyError), e:
 
1072
                # Some basis is not locally present: always fulltext
 
1073
                return False
 
1074
            index_memo, compression_parent, _, _ = parent_details
 
1075
            _, _, size = index_memo
 
1076
            if compression_parent is None:
 
1077
                fulltext_size = size
 
1078
                break
 
1079
            delta_size += size
 
1080
            # We don't explicitly check for presence because this is in an
 
1081
            # inner loop, and if it's missing it'll fail anyhow.
 
1082
            parent = compression_parent
 
1083
        else:
 
1084
            # We couldn't find a fulltext, so we must create a new one
 
1085
            return False
 
1086
        # Simple heuristic - if the total I/O wold be greater as a delta than
 
1087
        # the originally installed fulltext, we create a new fulltext.
 
1088
        return fulltext_size > delta_size
 
1089
 
 
1090
    def _build_details_to_components(self, build_details):
 
1091
        """Convert a build_details tuple to a position tuple."""
 
1092
        # record_details, access_memo, compression_parent
 
1093
        return build_details[3], build_details[0], build_details[1]
 
1094
 
 
1095
    def _get_components_positions(self, keys, allow_missing=False):
 
1096
        """Produce a map of position data for the components of keys.
 
1097
 
 
1098
        This data is intended to be used for retrieving the knit records.
 
1099
 
 
1100
        A dict of key to (record_details, index_memo, next, parents) is
 
1101
        returned.
 
1102
        method is the way referenced data should be applied.
 
1103
        index_memo is the handle to pass to the data access to actually get the
 
1104
            data
 
1105
        next is the build-parent of the version, or None for fulltexts.
 
1106
        parents is the version_ids of the parents of this version
 
1107
 
 
1108
        :param allow_missing: If True do not raise an error on a missing component,
 
1109
            just ignore it.
 
1110
        """
 
1111
        component_data = {}
 
1112
        pending_components = keys
 
1113
        while pending_components:
 
1114
            build_details = self._index.get_build_details(pending_components)
 
1115
            current_components = set(pending_components)
 
1116
            pending_components = set()
 
1117
            for key, details in build_details.iteritems():
 
1118
                (index_memo, compression_parent, parents,
 
1119
                 record_details) = details
 
1120
                method = record_details[0]
 
1121
                if compression_parent is not None:
 
1122
                    pending_components.add(compression_parent)
 
1123
                component_data[key] = self._build_details_to_components(details)
 
1124
            missing = current_components.difference(build_details)
 
1125
            if missing and not allow_missing:
 
1126
                raise errors.RevisionNotPresent(missing.pop(), self)
 
1127
        return component_data
 
1128
 
 
1129
    def _get_content(self, key, parent_texts={}):
 
1130
        """Returns a content object that makes up the specified
 
1131
        version."""
 
1132
        cached_version = parent_texts.get(key, None)
 
1133
        if cached_version is not None:
 
1134
            # Ensure the cache dict is valid.
 
1135
            if not self.get_parent_map([key]):
 
1136
                raise RevisionNotPresent(key, self)
 
1137
            return cached_version
 
1138
        generator = _VFContentMapGenerator(self, [key])
 
1139
        return generator._get_content(key)
 
1140
 
 
1141
    def get_parent_map(self, keys):
 
1142
        """Get a map of the graph parents of keys.
 
1143
 
 
1144
        :param keys: The keys to look up parents for.
 
1145
        :return: A mapping from keys to parents. Absent keys are absent from
 
1146
            the mapping.
 
1147
        """
 
1148
        return self._get_parent_map_with_sources(keys)[0]
 
1149
 
 
1150
    def _get_parent_map_with_sources(self, keys):
 
1151
        """Get a map of the parents of keys.
 
1152
 
 
1153
        :param keys: The keys to look up parents for.
 
1154
        :return: A tuple. The first element is a mapping from keys to parents.
 
1155
            Absent keys are absent from the mapping. The second element is a
 
1156
            list with the locations each key was found in. The first element
 
1157
            is the in-this-knit parents, the second the first fallback source,
 
1158
            and so on.
 
1159
        """
 
1160
        result = {}
 
1161
        sources = [self._index] + self._fallback_vfs
 
1162
        source_results = []
 
1163
        missing = set(keys)
 
1164
        for source in sources:
 
1165
            if not missing:
 
1166
                break
 
1167
            new_result = source.get_parent_map(missing)
 
1168
            source_results.append(new_result)
 
1169
            result.update(new_result)
 
1170
            missing.difference_update(set(new_result))
 
1171
        return result, source_results
 
1172
 
 
1173
    def _get_record_map(self, keys, allow_missing=False):
 
1174
        """Produce a dictionary of knit records.
 
1175
 
 
1176
        :return: {key:(record, record_details, digest, next)}
 
1177
            record
 
1178
                data returned from read_records (a KnitContentobject)
 
1179
            record_details
 
1180
                opaque information to pass to parse_record
 
1181
            digest
 
1182
                SHA1 digest of the full text after all steps are done
 
1183
            next
 
1184
                build-parent of the version, i.e. the leftmost ancestor.
 
1185
                Will be None if the record is not a delta.
 
1186
        :param keys: The keys to build a map for
 
1187
        :param allow_missing: If some records are missing, rather than
 
1188
            error, just return the data that could be generated.
 
1189
        """
 
1190
        raw_map = self._get_record_map_unparsed(keys,
 
1191
            allow_missing=allow_missing)
 
1192
        return self._raw_map_to_record_map(raw_map)
 
1193
 
 
1194
    def _raw_map_to_record_map(self, raw_map):
 
1195
        """Parse the contents of _get_record_map_unparsed.
 
1196
 
 
1197
        :return: see _get_record_map.
 
1198
        """
 
1199
        result = {}
 
1200
        for key in raw_map:
 
1201
            data, record_details, next = raw_map[key]
 
1202
            content, digest = self._parse_record(key[-1], data)
 
1203
            result[key] = content, record_details, digest, next
 
1204
        return result
 
1205
 
 
1206
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1207
        """Get the raw data for reconstructing keys without parsing it.
 
1208
 
 
1209
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1210
            key-> raw_bytes, (method, noeol), compression_parent
 
1211
        """
 
1212
        # This retries the whole request if anything fails. Potentially we
 
1213
        # could be a bit more selective. We could track the keys whose records
 
1214
        # we have successfully found, and then only request the new records
 
1215
        # from there. However, _get_components_positions grabs the whole build
 
1216
        # chain, which means we'll likely try to grab the same records again
 
1217
        # anyway. Also, can the build chains change as part of a pack
 
1218
        # operation? We wouldn't want to end up with a broken chain.
 
1219
        while True:
 
1220
            try:
 
1221
                position_map = self._get_components_positions(keys,
 
1222
                    allow_missing=allow_missing)
 
1223
                # key = component_id, r = record_details, i_m = index_memo,
 
1224
                # n = next
 
1225
                records = [(key, i_m) for key, (r, i_m, n)
 
1226
                                       in position_map.iteritems()]
 
1227
                # Sort by the index memo, so that we request records from the
 
1228
                # same pack file together, and in forward-sorted order
 
1229
                records.sort(key=operator.itemgetter(1))
 
1230
                raw_record_map = {}
 
1231
                for key, data in self._read_records_iter_unchecked(records):
 
1232
                    (record_details, index_memo, next) = position_map[key]
 
1233
                    raw_record_map[key] = data, record_details, next
 
1234
                return raw_record_map
 
1235
            except errors.RetryWithNewPacks, e:
 
1236
                self._access.reload_or_raise(e)
 
1237
 
 
1238
    @classmethod
 
1239
    def _split_by_prefix(cls, keys):
 
1240
        """For the given keys, split them up based on their prefix.
 
1241
 
 
1242
        To keep memory pressure somewhat under control, split the
 
1243
        requests back into per-file-id requests, otherwise "bzr co"
 
1244
        extracts the full tree into memory before writing it to disk.
 
1245
        This should be revisited if _get_content_maps() can ever cross
 
1246
        file-id boundaries.
 
1247
 
 
1248
        The keys for a given file_id are kept in the same relative order.
 
1249
        Ordering between file_ids is not, though prefix_order will return the
 
1250
        order that the key was first seen.
 
1251
 
 
1252
        :param keys: An iterable of key tuples
 
1253
        :return: (split_map, prefix_order)
 
1254
            split_map       A dictionary mapping prefix => keys
 
1255
            prefix_order    The order that we saw the various prefixes
 
1256
        """
 
1257
        split_by_prefix = {}
 
1258
        prefix_order = []
 
1259
        for key in keys:
 
1260
            if len(key) == 1:
 
1261
                prefix = ''
 
1262
            else:
 
1263
                prefix = key[0]
 
1264
 
 
1265
            if prefix in split_by_prefix:
 
1266
                split_by_prefix[prefix].append(key)
 
1267
            else:
 
1268
                split_by_prefix[prefix] = [key]
 
1269
                prefix_order.append(prefix)
 
1270
        return split_by_prefix, prefix_order
 
1271
 
 
1272
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1273
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1274
        """For the given keys, group them into 'best-sized' requests.
 
1275
 
 
1276
        The idea is to avoid making 1 request per file, but to never try to
 
1277
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1278
        possible, we should try to group requests to the same pack file
 
1279
        together.
 
1280
 
 
1281
        :return: list of (keys, non_local) tuples that indicate what keys
 
1282
            should be fetched next.
 
1283
        """
 
1284
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1285
        #       from the same pack file together, and we want to extract all
 
1286
        #       the texts for a given build-chain together. Ultimately it
 
1287
        #       probably needs a better global view.
 
1288
        total_keys = len(keys)
 
1289
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1290
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1291
        cur_keys = []
 
1292
        cur_non_local = set()
 
1293
        cur_size = 0
 
1294
        result = []
 
1295
        sizes = []
 
1296
        for prefix in prefix_order:
 
1297
            keys = prefix_split_keys[prefix]
 
1298
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1299
 
 
1300
            this_size = self._index._get_total_build_size(keys, positions)
 
1301
            cur_size += this_size
 
1302
            cur_keys.extend(keys)
 
1303
            cur_non_local.update(non_local)
 
1304
            if cur_size > _min_buffer_size:
 
1305
                result.append((cur_keys, cur_non_local))
 
1306
                sizes.append(cur_size)
 
1307
                cur_keys = []
 
1308
                cur_non_local = set()
 
1309
                cur_size = 0
 
1310
        if cur_keys:
 
1311
            result.append((cur_keys, cur_non_local))
 
1312
            sizes.append(cur_size)
 
1313
        return result
 
1314
 
 
1315
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
1316
        """Get a stream of records for keys.
 
1317
 
 
1318
        :param keys: The keys to include.
 
1319
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
1320
            sorted stream has compression parents strictly before their
 
1321
            children.
 
1322
        :param include_delta_closure: If True then the closure across any
 
1323
            compression parents will be included (in the opaque data).
 
1324
        :return: An iterator of ContentFactory objects, each of which is only
 
1325
            valid until the iterator is advanced.
 
1326
        """
 
1327
        # keys might be a generator
 
1328
        keys = set(keys)
 
1329
        if not keys:
 
1330
            return
 
1331
        if not self._index.has_graph:
 
1332
            # Cannot sort when no graph has been stored.
 
1333
            ordering = 'unordered'
 
1334
 
 
1335
        remaining_keys = keys
 
1336
        while True:
 
1337
            try:
 
1338
                keys = set(remaining_keys)
 
1339
                for content_factory in self._get_remaining_record_stream(keys,
 
1340
                                            ordering, include_delta_closure):
 
1341
                    remaining_keys.discard(content_factory.key)
 
1342
                    yield content_factory
 
1343
                return
 
1344
            except errors.RetryWithNewPacks, e:
 
1345
                self._access.reload_or_raise(e)
 
1346
 
 
1347
    def _get_remaining_record_stream(self, keys, ordering,
 
1348
                                     include_delta_closure):
 
1349
        """This function is the 'retry' portion for get_record_stream."""
 
1350
        if include_delta_closure:
 
1351
            positions = self._get_components_positions(keys, allow_missing=True)
 
1352
        else:
 
1353
            build_details = self._index.get_build_details(keys)
 
1354
            # map from key to
 
1355
            # (record_details, access_memo, compression_parent_key)
 
1356
            positions = dict((key, self._build_details_to_components(details))
 
1357
                for key, details in build_details.iteritems())
 
1358
        absent_keys = keys.difference(set(positions))
 
1359
        # There may be more absent keys : if we're missing the basis component
 
1360
        # and are trying to include the delta closure.
 
1361
        # XXX: We should not ever need to examine remote sources because we do
 
1362
        # not permit deltas across versioned files boundaries.
 
1363
        if include_delta_closure:
 
1364
            needed_from_fallback = set()
 
1365
            # Build up reconstructable_keys dict.  key:True in this dict means
 
1366
            # the key can be reconstructed.
 
1367
            reconstructable_keys = {}
 
1368
            for key in keys:
 
1369
                # the delta chain
 
1370
                try:
 
1371
                    chain = [key, positions[key][2]]
 
1372
                except KeyError:
 
1373
                    needed_from_fallback.add(key)
 
1374
                    continue
 
1375
                result = True
 
1376
                while chain[-1] is not None:
 
1377
                    if chain[-1] in reconstructable_keys:
 
1378
                        result = reconstructable_keys[chain[-1]]
 
1379
                        break
 
1380
                    else:
 
1381
                        try:
 
1382
                            chain.append(positions[chain[-1]][2])
 
1383
                        except KeyError:
 
1384
                            # missing basis component
 
1385
                            needed_from_fallback.add(chain[-1])
 
1386
                            result = True
 
1387
                            break
 
1388
                for chain_key in chain[:-1]:
 
1389
                    reconstructable_keys[chain_key] = result
 
1390
                if not result:
 
1391
                    needed_from_fallback.add(key)
 
1392
        # Double index lookups here : need a unified api ?
 
1393
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
 
1394
        if ordering in ('topological', 'groupcompress'):
 
1395
            if ordering == 'topological':
 
1396
                # Global topological sort
 
1397
                present_keys = tsort.topo_sort(global_map)
 
1398
            else:
 
1399
                present_keys = sort_groupcompress(global_map)
 
1400
            # Now group by source:
 
1401
            source_keys = []
 
1402
            current_source = None
 
1403
            for key in present_keys:
 
1404
                for parent_map in parent_maps:
 
1405
                    if key in parent_map:
 
1406
                        key_source = parent_map
 
1407
                        break
 
1408
                if current_source is not key_source:
 
1409
                    source_keys.append((key_source, []))
 
1410
                    current_source = key_source
 
1411
                source_keys[-1][1].append(key)
 
1412
        else:
 
1413
            if ordering != 'unordered':
 
1414
                raise AssertionError('valid values for ordering are:'
 
1415
                    ' "unordered", "groupcompress" or "topological" not: %r'
 
1416
                    % (ordering,))
 
1417
            # Just group by source; remote sources first.
 
1418
            present_keys = []
 
1419
            source_keys = []
 
1420
            for parent_map in reversed(parent_maps):
 
1421
                source_keys.append((parent_map, []))
 
1422
                for key in parent_map:
 
1423
                    present_keys.append(key)
 
1424
                    source_keys[-1][1].append(key)
 
1425
            # We have been requested to return these records in an order that
 
1426
            # suits us. So we ask the index to give us an optimally sorted
 
1427
            # order.
 
1428
            for source, sub_keys in source_keys:
 
1429
                if source is parent_maps[0]:
 
1430
                    # Only sort the keys for this VF
 
1431
                    self._index._sort_keys_by_io(sub_keys, positions)
 
1432
        absent_keys = keys - set(global_map)
 
1433
        for key in absent_keys:
 
1434
            yield AbsentContentFactory(key)
 
1435
        # restrict our view to the keys we can answer.
 
1436
        # XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
 
1437
        # XXX: At that point we need to consider the impact of double reads by
 
1438
        # utilising components multiple times.
 
1439
        if include_delta_closure:
 
1440
            # XXX: get_content_maps performs its own index queries; allow state
 
1441
            # to be passed in.
 
1442
            non_local_keys = needed_from_fallback - absent_keys
 
1443
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1444
                                                                non_local_keys,
 
1445
                                                                positions):
 
1446
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1447
                                                   global_map)
 
1448
                for record in generator.get_record_stream():
 
1449
                    yield record
 
1450
        else:
 
1451
            for source, keys in source_keys:
 
1452
                if source is parent_maps[0]:
 
1453
                    # this KnitVersionedFiles
 
1454
                    records = [(key, positions[key][1]) for key in keys]
 
1455
                    for key, raw_data, sha1 in self._read_records_iter_raw(records):
 
1456
                        (record_details, index_memo, _) = positions[key]
 
1457
                        yield KnitContentFactory(key, global_map[key],
 
1458
                            record_details, sha1, raw_data, self._factory.annotated, None)
 
1459
                else:
 
1460
                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
 
1461
                    for record in vf.get_record_stream(keys, ordering,
 
1462
                        include_delta_closure):
 
1463
                        yield record
 
1464
 
 
1465
    def get_sha1s(self, keys):
 
1466
        """See VersionedFiles.get_sha1s()."""
 
1467
        missing = set(keys)
 
1468
        record_map = self._get_record_map(missing, allow_missing=True)
 
1469
        result = {}
 
1470
        for key, details in record_map.iteritems():
 
1471
            if key not in missing:
 
1472
                continue
 
1473
            # record entry 2 is the 'digest'.
 
1474
            result[key] = details[2]
 
1475
        missing.difference_update(set(result))
 
1476
        for source in self._fallback_vfs:
 
1477
            if not missing:
 
1478
                break
 
1479
            new_result = source.get_sha1s(missing)
 
1480
            result.update(new_result)
 
1481
            missing.difference_update(set(new_result))
 
1482
        return result
 
1483
 
 
1484
    def insert_record_stream(self, stream):
 
1485
        """Insert a record stream into this container.
 
1486
 
 
1487
        :param stream: A stream of records to insert.
 
1488
        :return: None
 
1489
        :seealso VersionedFiles.get_record_stream:
 
1490
        """
 
1491
        def get_adapter(adapter_key):
 
1492
            try:
 
1493
                return adapters[adapter_key]
 
1494
            except KeyError:
 
1495
                adapter_factory = adapter_registry.get(adapter_key)
 
1496
                adapter = adapter_factory(self)
 
1497
                adapters[adapter_key] = adapter
 
1498
                return adapter
 
1499
        delta_types = set()
 
1500
        if self._factory.annotated:
 
1501
            # self is annotated, we need annotated knits to use directly.
 
1502
            annotated = "annotated-"
 
1503
            convertibles = []
 
1504
        else:
 
1505
            # self is not annotated, but we can strip annotations cheaply.
 
1506
            annotated = ""
 
1507
            convertibles = set(["knit-annotated-ft-gz"])
 
1508
            if self._max_delta_chain:
 
1509
                delta_types.add("knit-annotated-delta-gz")
 
1510
                convertibles.add("knit-annotated-delta-gz")
 
1511
        # The set of types we can cheaply adapt without needing basis texts.
 
1512
        native_types = set()
 
1513
        if self._max_delta_chain:
 
1514
            native_types.add("knit-%sdelta-gz" % annotated)
 
1515
            delta_types.add("knit-%sdelta-gz" % annotated)
 
1516
        native_types.add("knit-%sft-gz" % annotated)
 
1517
        knit_types = native_types.union(convertibles)
 
1518
        adapters = {}
 
1519
        # Buffer all index entries that we can't add immediately because their
 
1520
        # basis parent is missing. We don't buffer all because generating
 
1521
        # annotations may require access to some of the new records. However we
 
1522
        # can't generate annotations from new deltas until their basis parent
 
1523
        # is present anyway, so we get away with not needing an index that
 
1524
        # includes the new keys.
 
1525
        #
 
1526
        # See <http://launchpad.net/bugs/300177> about ordering of compression
 
1527
        # parents in the records - to be conservative, we insist that all
 
1528
        # parents must be present to avoid expanding to a fulltext.
 
1529
        #
 
1530
        # key = basis_parent, value = index entry to add
 
1531
        buffered_index_entries = {}
 
1532
        for record in stream:
 
1533
            buffered = False
 
1534
            parents = record.parents
 
1535
            if record.storage_kind in delta_types:
 
1536
                # TODO: eventually the record itself should track
 
1537
                #       compression_parent
 
1538
                compression_parent = parents[0]
 
1539
            else:
 
1540
                compression_parent = None
 
1541
            # Raise an error when a record is missing.
 
1542
            if record.storage_kind == 'absent':
 
1543
                raise RevisionNotPresent([record.key], self)
 
1544
            elif ((record.storage_kind in knit_types)
 
1545
                  and (compression_parent is None
 
1546
                       or not self._fallback_vfs
 
1547
                       or self._index.has_key(compression_parent)
 
1548
                       or not self.has_key(compression_parent))):
 
1549
                # we can insert the knit record literally if either it has no
 
1550
                # compression parent OR we already have its basis in this kvf
 
1551
                # OR the basis is not present even in the fallbacks.  In the
 
1552
                # last case it will either turn up later in the stream and all
 
1553
                # will be well, or it won't turn up at all and we'll raise an
 
1554
                # error at the end.
 
1555
                #
 
1556
                # TODO: self.has_key is somewhat redundant with
 
1557
                # self._index.has_key; we really want something that directly
 
1558
                # asks if it's only present in the fallbacks. -- mbp 20081119
 
1559
                if record.storage_kind not in native_types:
 
1560
                    try:
 
1561
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1562
                        adapter = get_adapter(adapter_key)
 
1563
                    except KeyError:
 
1564
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1565
                        adapter = get_adapter(adapter_key)
 
1566
                    bytes = adapter.get_bytes(record)
 
1567
                else:
 
1568
                    # It's a knit record, it has a _raw_record field (even if
 
1569
                    # it was reconstituted from a network stream).
 
1570
                    bytes = record._raw_record
 
1571
                options = [record._build_details[0]]
 
1572
                if record._build_details[1]:
 
1573
                    options.append('no-eol')
 
1574
                # Just blat it across.
 
1575
                # Note: This does end up adding data on duplicate keys. As
 
1576
                # modern repositories use atomic insertions this should not
 
1577
                # lead to excessive growth in the event of interrupted fetches.
 
1578
                # 'knit' repositories may suffer excessive growth, but as a
 
1579
                # deprecated format this is tolerable. It can be fixed if
 
1580
                # needed by in the kndx index support raising on a duplicate
 
1581
                # add with identical parents and options.
 
1582
                access_memo = self._access.add_raw_records(
 
1583
                    [(record.key, len(bytes))], bytes)[0]
 
1584
                index_entry = (record.key, options, access_memo, parents)
 
1585
                if 'fulltext' not in options:
 
1586
                    # Not a fulltext, so we need to make sure the compression
 
1587
                    # parent will also be present.
 
1588
                    # Note that pack backed knits don't need to buffer here
 
1589
                    # because they buffer all writes to the transaction level,
 
1590
                    # but we don't expose that difference at the index level. If
 
1591
                    # the query here has sufficient cost to show up in
 
1592
                    # profiling we should do that.
 
1593
                    #
 
1594
                    # They're required to be physically in this
 
1595
                    # KnitVersionedFiles, not in a fallback.
 
1596
                    if not self._index.has_key(compression_parent):
 
1597
                        pending = buffered_index_entries.setdefault(
 
1598
                            compression_parent, [])
 
1599
                        pending.append(index_entry)
 
1600
                        buffered = True
 
1601
                if not buffered:
 
1602
                    self._index.add_records([index_entry])
 
1603
            elif record.storage_kind == 'chunked':
 
1604
                self.add_lines(record.key, parents,
 
1605
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
 
1606
            else:
 
1607
                # Not suitable for direct insertion as a
 
1608
                # delta, either because it's not the right format, or this
 
1609
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
 
1610
                # 0) or because it depends on a base only present in the
 
1611
                # fallback kvfs.
 
1612
                self._access.flush()
 
1613
                try:
 
1614
                    # Try getting a fulltext directly from the record.
 
1615
                    bytes = record.get_bytes_as('fulltext')
 
1616
                except errors.UnavailableRepresentation:
 
1617
                    adapter_key = record.storage_kind, 'fulltext'
 
1618
                    adapter = get_adapter(adapter_key)
 
1619
                    bytes = adapter.get_bytes(record)
 
1620
                lines = split_lines(bytes)
 
1621
                try:
 
1622
                    self.add_lines(record.key, parents, lines)
 
1623
                except errors.RevisionAlreadyPresent:
 
1624
                    pass
 
1625
            # Add any records whose basis parent is now available.
 
1626
            if not buffered:
 
1627
                added_keys = [record.key]
 
1628
                while added_keys:
 
1629
                    key = added_keys.pop(0)
 
1630
                    if key in buffered_index_entries:
 
1631
                        index_entries = buffered_index_entries[key]
 
1632
                        self._index.add_records(index_entries)
 
1633
                        added_keys.extend(
 
1634
                            [index_entry[0] for index_entry in index_entries])
 
1635
                        del buffered_index_entries[key]
 
1636
        if buffered_index_entries:
 
1637
            # There were index entries buffered at the end of the stream,
 
1638
            # So these need to be added (if the index supports holding such
 
1639
            # entries for later insertion)
 
1640
            for key in buffered_index_entries:
 
1641
                index_entries = buffered_index_entries[key]
 
1642
                self._index.add_records(index_entries,
 
1643
                    missing_compression_parents=True)
 
1644
 
 
1645
    def get_missing_compression_parent_keys(self):
 
1646
        """Return an iterable of keys of missing compression parents.
 
1647
 
 
1648
        Check this after calling insert_record_stream to find out if there are
 
1649
        any missing compression parents.  If there are, the records that
 
1650
        depend on them are not able to be inserted safely. For atomic
 
1651
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1652
        suspended - commit will fail at this point. Nonatomic knits will error
 
1653
        earlier because they have no staging area to put pending entries into.
 
1654
        """
 
1655
        return self._index.get_missing_compression_parents()
 
1656
 
 
1657
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
1658
        """Iterate over the lines in the versioned files from keys.
 
1659
 
 
1660
        This may return lines from other keys. Each item the returned
 
1661
        iterator yields is a tuple of a line and a text version that that line
 
1662
        is present in (not introduced in).
 
1663
 
 
1664
        Ordering of results is in whatever order is most suitable for the
 
1665
        underlying storage format.
 
1666
 
 
1667
        If a progress bar is supplied, it may be used to indicate progress.
 
1668
        The caller is responsible for cleaning up progress bars (because this
 
1669
        is an iterator).
 
1670
 
 
1671
        NOTES:
 
1672
         * Lines are normalised by the underlying store: they will all have \\n
 
1673
           terminators.
 
1674
         * Lines are returned in arbitrary order.
 
1675
         * If a requested key did not change any lines (or didn't have any
 
1676
           lines), it may not be mentioned at all in the result.
 
1677
 
 
1678
        :param pb: Progress bar supplied by caller.
 
1679
        :return: An iterator over (line, key).
 
1680
        """
 
1681
        if pb is None:
 
1682
            pb = progress.DummyProgress()
 
1683
        keys = set(keys)
 
1684
        total = len(keys)
 
1685
        done = False
 
1686
        while not done:
 
1687
            try:
 
1688
                # we don't care about inclusions, the caller cares.
 
1689
                # but we need to setup a list of records to visit.
 
1690
                # we need key, position, length
 
1691
                key_records = []
 
1692
                build_details = self._index.get_build_details(keys)
 
1693
                for key, details in build_details.iteritems():
 
1694
                    if key in keys:
 
1695
                        key_records.append((key, details[0]))
 
1696
                records_iter = enumerate(self._read_records_iter(key_records))
 
1697
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1698
                    pb.update('Walking content', key_idx, total)
 
1699
                    compression_parent = build_details[key][1]
 
1700
                    if compression_parent is None:
 
1701
                        # fulltext
 
1702
                        line_iterator = self._factory.get_fulltext_content(data)
 
1703
                    else:
 
1704
                        # Delta
 
1705
                        line_iterator = self._factory.get_linedelta_content(data)
 
1706
                    # Now that we are yielding the data for this key, remove it
 
1707
                    # from the list
 
1708
                    keys.remove(key)
 
1709
                    # XXX: It might be more efficient to yield (key,
 
1710
                    # line_iterator) in the future. However for now, this is a
 
1711
                    # simpler change to integrate into the rest of the
 
1712
                    # codebase. RBC 20071110
 
1713
                    for line in line_iterator:
 
1714
                        yield line, key
 
1715
                done = True
 
1716
            except errors.RetryWithNewPacks, e:
 
1717
                self._access.reload_or_raise(e)
 
1718
        # If there are still keys we've not yet found, we look in the fallback
 
1719
        # vfs, and hope to find them there.  Note that if the keys are found
 
1720
        # but had no changes or no content, the fallback may not return
 
1721
        # anything.
 
1722
        if keys and not self._fallback_vfs:
 
1723
            # XXX: strictly the second parameter is meant to be the file id
 
1724
            # but it's not easily accessible here.
 
1725
            raise RevisionNotPresent(keys, repr(self))
 
1726
        for source in self._fallback_vfs:
 
1727
            if not keys:
 
1728
                break
 
1729
            source_keys = set()
 
1730
            for line, key in source.iter_lines_added_or_present_in_keys(keys):
 
1731
                source_keys.add(key)
 
1732
                yield line, key
 
1733
            keys.difference_update(source_keys)
 
1734
        pb.update('Walking content', total, total)
 
1735
 
 
1736
    def _make_line_delta(self, delta_seq, new_content):
 
1737
        """Generate a line delta from delta_seq and new_content."""
 
1738
        diff_hunks = []
 
1739
        for op in delta_seq.get_opcodes():
 
1740
            if op[0] == 'equal':
 
1741
                continue
 
1742
            diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
 
1743
        return diff_hunks
 
1744
 
 
1745
    def _merge_annotations(self, content, parents, parent_texts={},
 
1746
                           delta=None, annotated=None,
 
1747
                           left_matching_blocks=None):
 
1748
        """Merge annotations for content and generate deltas.
 
1749
 
 
1750
        This is done by comparing the annotations based on changes to the text
 
1751
        and generating a delta on the resulting full texts. If annotations are
 
1752
        not being created then a simple delta is created.
 
1753
        """
 
1754
        if left_matching_blocks is not None:
 
1755
            delta_seq = diff._PrematchedMatcher(left_matching_blocks)
 
1756
        else:
 
1757
            delta_seq = None
 
1758
        if annotated:
 
1759
            for parent_key in parents:
 
1760
                merge_content = self._get_content(parent_key, parent_texts)
 
1761
                if (parent_key == parents[0] and delta_seq is not None):
 
1762
                    seq = delta_seq
 
1763
                else:
 
1764
                    seq = patiencediff.PatienceSequenceMatcher(
 
1765
                        None, merge_content.text(), content.text())
 
1766
                for i, j, n in seq.get_matching_blocks():
 
1767
                    if n == 0:
 
1768
                        continue
 
1769
                    # this copies (origin, text) pairs across to the new
 
1770
                    # content for any line that matches the last-checked
 
1771
                    # parent.
 
1772
                    content._lines[j:j+n] = merge_content._lines[i:i+n]
 
1773
            # XXX: Robert says the following block is a workaround for a
 
1774
            # now-fixed bug and it can probably be deleted. -- mbp 20080618
 
1775
            if content._lines and content._lines[-1][1][-1] != '\n':
 
1776
                # The copied annotation was from a line without a trailing EOL,
 
1777
                # reinstate one for the content object, to ensure correct
 
1778
                # serialization.
 
1779
                line = content._lines[-1][1] + '\n'
 
1780
                content._lines[-1] = (content._lines[-1][0], line)
 
1781
        if delta:
 
1782
            if delta_seq is None:
 
1783
                reference_content = self._get_content(parents[0], parent_texts)
 
1784
                new_texts = content.text()
 
1785
                old_texts = reference_content.text()
 
1786
                delta_seq = patiencediff.PatienceSequenceMatcher(
 
1787
                                                 None, old_texts, new_texts)
 
1788
            return self._make_line_delta(delta_seq, content)
 
1789
 
 
1790
    def _parse_record(self, version_id, data):
 
1791
        """Parse an original format knit record.
 
1792
 
 
1793
        These have the last element of the key only present in the stored data.
 
1794
        """
 
1795
        rec, record_contents = self._parse_record_unchecked(data)
 
1796
        self._check_header_version(rec, version_id)
 
1797
        return record_contents, rec[3]
 
1798
 
 
1799
    def _parse_record_header(self, key, raw_data):
 
1800
        """Parse a record header for consistency.
 
1801
 
 
1802
        :return: the header and the decompressor stream.
 
1803
                 as (stream, header_record)
 
1804
        """
 
1805
        df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(raw_data))
 
1806
        try:
 
1807
            # Current serialise
 
1808
            rec = self._check_header(key, df.readline())
 
1809
        except Exception, e:
 
1810
            raise KnitCorrupt(self,
 
1811
                              "While reading {%s} got %s(%s)"
 
1812
                              % (key, e.__class__.__name__, str(e)))
 
1813
        return df, rec
 
1814
 
 
1815
    def _parse_record_unchecked(self, data):
 
1816
        # profiling notes:
 
1817
        # 4168 calls in 2880 217 internal
 
1818
        # 4168 calls to _parse_record_header in 2121
 
1819
        # 4168 calls to readlines in 330
 
1820
        df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
 
1821
        try:
 
1822
            record_contents = df.readlines()
 
1823
        except Exception, e:
 
1824
            raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
 
1825
                (data, e.__class__.__name__, str(e)))
 
1826
        header = record_contents.pop(0)
 
1827
        rec = self._split_header(header)
 
1828
        last_line = record_contents.pop()
 
1829
        if len(record_contents) != int(rec[2]):
 
1830
            raise KnitCorrupt(self,
 
1831
                              'incorrect number of lines %s != %s'
 
1832
                              ' for version {%s} %s'
 
1833
                              % (len(record_contents), int(rec[2]),
 
1834
                                 rec[1], record_contents))
 
1835
        if last_line != 'end %s\n' % rec[1]:
 
1836
            raise KnitCorrupt(self,
 
1837
                              'unexpected version end line %r, wanted %r'
 
1838
                              % (last_line, rec[1]))
 
1839
        df.close()
 
1840
        return rec, record_contents
 
1841
 
 
1842
    def _read_records_iter(self, records):
 
1843
        """Read text records from data file and yield result.
 
1844
 
 
1845
        The result will be returned in whatever is the fastest to read.
 
1846
        Not by the order requested. Also, multiple requests for the same
 
1847
        record will only yield 1 response.
 
1848
        :param records: A list of (key, access_memo) entries
 
1849
        :return: Yields (key, contents, digest) in the order
 
1850
                 read, not the order requested
 
1851
        """
 
1852
        if not records:
 
1853
            return
 
1854
 
 
1855
        # XXX: This smells wrong, IO may not be getting ordered right.
 
1856
        needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1857
        if not needed_records:
 
1858
            return
 
1859
 
 
1860
        # The transport optimizes the fetching as well
 
1861
        # (ie, reads continuous ranges.)
 
1862
        raw_data = self._access.get_raw_records(
 
1863
            [index_memo for key, index_memo in needed_records])
 
1864
 
 
1865
        for (key, index_memo), data in \
 
1866
                izip(iter(needed_records), raw_data):
 
1867
            content, digest = self._parse_record(key[-1], data)
 
1868
            yield key, content, digest
 
1869
 
 
1870
    def _read_records_iter_raw(self, records):
 
1871
        """Read text records from data file and yield raw data.
 
1872
 
 
1873
        This unpacks enough of the text record to validate the id is
 
1874
        as expected but thats all.
 
1875
 
 
1876
        Each item the iterator yields is (key, bytes,
 
1877
            expected_sha1_of_full_text).
 
1878
        """
 
1879
        for key, data in self._read_records_iter_unchecked(records):
 
1880
            # validate the header (note that we can only use the suffix in
 
1881
            # current knit records).
 
1882
            df, rec = self._parse_record_header(key, data)
 
1883
            df.close()
 
1884
            yield key, data, rec[3]
 
1885
 
 
1886
    def _read_records_iter_unchecked(self, records):
 
1887
        """Read text records from data file and yield raw data.
 
1888
 
 
1889
        No validation is done.
 
1890
 
 
1891
        Yields tuples of (key, data).
 
1892
        """
 
1893
        # setup an iterator of the external records:
 
1894
        # uses readv so nice and fast we hope.
 
1895
        if len(records):
 
1896
            # grab the disk data needed.
 
1897
            needed_offsets = [index_memo for key, index_memo
 
1898
                                           in records]
 
1899
            raw_records = self._access.get_raw_records(needed_offsets)
 
1900
 
 
1901
        for key, index_memo in records:
 
1902
            data = raw_records.next()
 
1903
            yield key, data
 
1904
 
 
1905
    def _record_to_data(self, key, digest, lines, dense_lines=None):
 
1906
        """Convert key, digest, lines into a raw data block.
 
1907
 
 
1908
        :param key: The key of the record. Currently keys are always serialised
 
1909
            using just the trailing component.
 
1910
        :param dense_lines: The bytes of lines but in a denser form. For
 
1911
            instance, if lines is a list of 1000 bytestrings each ending in \n,
 
1912
            dense_lines may be a list with one line in it, containing all the
 
1913
            1000's lines and their \n's. Using dense_lines if it is already
 
1914
            known is a win because the string join to create bytes in this
 
1915
            function spends less time resizing the final string.
 
1916
        :return: (len, a StringIO instance with the raw data ready to read.)
 
1917
        """
 
1918
        # Note: using a string copy here increases memory pressure with e.g.
 
1919
        # ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
 
1920
        # when doing the initial commit of a mozilla tree. RBC 20070921
 
1921
        bytes = ''.join(chain(
 
1922
            ["version %s %d %s\n" % (key[-1],
 
1923
                                     len(lines),
 
1924
                                     digest)],
 
1925
            dense_lines or lines,
 
1926
            ["end %s\n" % key[-1]]))
 
1927
        if type(bytes) != str:
 
1928
            raise AssertionError(
 
1929
                'data must be plain bytes was %s' % type(bytes))
 
1930
        if lines and lines[-1][-1] != '\n':
 
1931
            raise ValueError('corrupt lines value %r' % lines)
 
1932
        compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
 
1933
        return len(compressed_bytes), compressed_bytes
 
1934
 
 
1935
    def _split_header(self, line):
 
1936
        rec = line.split()
 
1937
        if len(rec) != 4:
 
1938
            raise KnitCorrupt(self,
 
1939
                              'unexpected number of elements in record header')
 
1940
        return rec
 
1941
 
 
1942
    def keys(self):
 
1943
        """See VersionedFiles.keys."""
 
1944
        if 'evil' in debug.debug_flags:
 
1945
            trace.mutter_callsite(2, "keys scales with size of history")
 
1946
        sources = [self._index] + self._fallback_vfs
 
1947
        result = set()
 
1948
        for source in sources:
 
1949
            result.update(source.keys())
 
1950
        return result
 
1951
 
 
1952
 
 
1953
class _ContentMapGenerator(object):
 
1954
    """Generate texts or expose raw deltas for a set of texts."""
 
1955
 
 
1956
    def _get_content(self, key):
 
1957
        """Get the content object for key."""
 
1958
        # Note that _get_content is only called when the _ContentMapGenerator
 
1959
        # has been constructed with just one key requested for reconstruction.
 
1960
        if key in self.nonlocal_keys:
 
1961
            record = self.get_record_stream().next()
 
1962
            # Create a content object on the fly
 
1963
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1964
            return PlainKnitContent(lines, record.key)
 
1965
        else:
 
1966
            # local keys we can ask for directly
 
1967
            return self._get_one_work(key)
 
1968
 
 
1969
    def get_record_stream(self):
 
1970
        """Get a record stream for the keys requested during __init__."""
 
1971
        for record in self._work():
 
1972
            yield record
 
1973
 
 
1974
    def _work(self):
 
1975
        """Produce maps of text and KnitContents as dicts.
 
1976
 
 
1977
        :return: (text_map, content_map) where text_map contains the texts for
 
1978
            the requested versions and content_map contains the KnitContents.
 
1979
        """
 
1980
        # NB: By definition we never need to read remote sources unless texts
 
1981
        # are requested from them: we don't delta across stores - and we
 
1982
        # explicitly do not want to to prevent data loss situations.
 
1983
        if self.global_map is None:
 
1984
            self.global_map = self.vf.get_parent_map(self.keys)
 
1985
        nonlocal_keys = self.nonlocal_keys
 
1986
 
 
1987
        missing_keys = set(nonlocal_keys)
 
1988
        # Read from remote versioned file instances and provide to our caller.
 
1989
        for source in self.vf._fallback_vfs:
 
1990
            if not missing_keys:
 
1991
                break
 
1992
            # Loop over fallback repositories asking them for texts - ignore
 
1993
            # any missing from a particular fallback.
 
1994
            for record in source.get_record_stream(missing_keys,
 
1995
                'unordered', True):
 
1996
                if record.storage_kind == 'absent':
 
1997
                    # Not in thie particular stream, may be in one of the
 
1998
                    # other fallback vfs objects.
 
1999
                    continue
 
2000
                missing_keys.remove(record.key)
 
2001
                yield record
 
2002
 
 
2003
        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
 
2004
            allow_missing=True)
 
2005
        first = True
 
2006
        for key in self.keys:
 
2007
            if key in self.nonlocal_keys:
 
2008
                continue
 
2009
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2010
            first = False
 
2011
 
 
2012
    def _get_one_work(self, requested_key):
 
2013
        # Now, if we have calculated everything already, just return the
 
2014
        # desired text.
 
2015
        if requested_key in self._contents_map:
 
2016
            return self._contents_map[requested_key]
 
2017
        # To simplify things, parse everything at once - code that wants one text
 
2018
        # probably wants them all.
 
2019
        # FUTURE: This function could be improved for the 'extract many' case
 
2020
        # by tracking each component and only doing the copy when the number of
 
2021
        # children than need to apply delta's to it is > 1 or it is part of the
 
2022
        # final output.
 
2023
        multiple_versions = len(self.keys) != 1
 
2024
        if self._record_map is None:
 
2025
            self._record_map = self.vf._raw_map_to_record_map(
 
2026
                self._raw_record_map)
 
2027
        record_map = self._record_map
 
2028
        # raw_record_map is key:
 
2029
        # Have read and parsed records at this point.
 
2030
        for key in self.keys:
 
2031
            if key in self.nonlocal_keys:
 
2032
                # already handled
 
2033
                continue
 
2034
            components = []
 
2035
            cursor = key
 
2036
            while cursor is not None:
 
2037
                try:
 
2038
                    record, record_details, digest, next = record_map[cursor]
 
2039
                except KeyError:
 
2040
                    raise RevisionNotPresent(cursor, self)
 
2041
                components.append((cursor, record, record_details, digest))
 
2042
                cursor = next
 
2043
                if cursor in self._contents_map:
 
2044
                    # no need to plan further back
 
2045
                    components.append((cursor, None, None, None))
 
2046
                    break
 
2047
 
 
2048
            content = None
 
2049
            for (component_id, record, record_details,
 
2050
                 digest) in reversed(components):
 
2051
                if component_id in self._contents_map:
 
2052
                    content = self._contents_map[component_id]
 
2053
                else:
 
2054
                    content, delta = self._factory.parse_record(key[-1],
 
2055
                        record, record_details, content,
 
2056
                        copy_base_content=multiple_versions)
 
2057
                    if multiple_versions:
 
2058
                        self._contents_map[component_id] = content
 
2059
 
 
2060
            # digest here is the digest from the last applied component.
 
2061
            text = content.text()
 
2062
            actual_sha = sha_strings(text)
 
2063
            if actual_sha != digest:
 
2064
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2065
        if multiple_versions:
 
2066
            return self._contents_map[requested_key]
 
2067
        else:
 
2068
            return content
 
2069
 
 
2070
    def _wire_bytes(self):
 
2071
        """Get the bytes to put on the wire for 'key'.
 
2072
 
 
2073
        The first collection of bytes asked for returns the serialised
 
2074
        raw_record_map and the additional details (key, parent) for key.
 
2075
        Subsequent calls return just the additional details (key, parent).
 
2076
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2077
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2078
 
 
2079
        :param key: A key from the content generator.
 
2080
        :return: Bytes to put on the wire.
 
2081
        """
 
2082
        lines = []
 
2083
        # kind marker for dispatch on the far side,
 
2084
        lines.append('knit-delta-closure')
 
2085
        # Annotated or not
 
2086
        if self.vf._factory.annotated:
 
2087
            lines.append('annotated')
 
2088
        else:
 
2089
            lines.append('')
 
2090
        # then the list of keys
 
2091
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
2092
            if key not in self.nonlocal_keys]))
 
2093
        # then the _raw_record_map in serialised form:
 
2094
        map_byte_list = []
 
2095
        # for each item in the map:
 
2096
        # 1 line with key
 
2097
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2098
        # one line with method
 
2099
        # one line with noeol
 
2100
        # one line with next ('' for None)
 
2101
        # one line with byte count of the record bytes
 
2102
        # the record bytes
 
2103
        for key, (record_bytes, (method, noeol), next) in \
 
2104
            self._raw_record_map.iteritems():
 
2105
            key_bytes = '\x00'.join(key)
 
2106
            parents = self.global_map.get(key, None)
 
2107
            if parents is None:
 
2108
                parent_bytes = 'None:'
 
2109
            else:
 
2110
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2111
            method_bytes = method
 
2112
            if noeol:
 
2113
                noeol_bytes = "T"
 
2114
            else:
 
2115
                noeol_bytes = "F"
 
2116
            if next:
 
2117
                next_bytes = '\x00'.join(next)
 
2118
            else:
 
2119
                next_bytes = ''
 
2120
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2121
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2122
                len(record_bytes), record_bytes))
 
2123
        map_bytes = ''.join(map_byte_list)
 
2124
        lines.append(map_bytes)
 
2125
        bytes = '\n'.join(lines)
 
2126
        return bytes
 
2127
 
 
2128
 
 
2129
class _VFContentMapGenerator(_ContentMapGenerator):
 
2130
    """Content map generator reading from a VersionedFiles object."""
 
2131
 
 
2132
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2133
        global_map=None, raw_record_map=None):
 
2134
        """Create a _ContentMapGenerator.
 
2135
 
 
2136
        :param versioned_files: The versioned files that the texts are being
 
2137
            extracted from.
 
2138
        :param keys: The keys to produce content maps for.
 
2139
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2140
            which are known to not be in this knit, but rather in one of the
 
2141
            fallback knits.
 
2142
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2143
            This is required if get_record_stream() is to be used.
 
2144
        :param raw_record_map: A unparsed raw record map to use for answering
 
2145
            contents.
 
2146
        """
 
2147
        # The vf to source data from
 
2148
        self.vf = versioned_files
 
2149
        # The keys desired
 
2150
        self.keys = list(keys)
 
2151
        # Keys known to be in fallback vfs objects
 
2152
        if nonlocal_keys is None:
 
2153
            self.nonlocal_keys = set()
 
2154
        else:
 
2155
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2156
        # Parents data for keys to be returned in get_record_stream
 
2157
        self.global_map = global_map
 
2158
        # The chunked lists for self.keys in text form
 
2159
        self._text_map = {}
 
2160
        # A cache of KnitContent objects used in extracting texts.
 
2161
        self._contents_map = {}
 
2162
        # All the knit records needed to assemble the requested keys as full
 
2163
        # texts.
 
2164
        self._record_map = None
 
2165
        if raw_record_map is None:
 
2166
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2167
                allow_missing=True)
 
2168
        else:
 
2169
            self._raw_record_map = raw_record_map
 
2170
        # the factory for parsing records
 
2171
        self._factory = self.vf._factory
 
2172
 
 
2173
 
 
2174
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2175
    """Content map generator sourced from a network stream."""
 
2176
 
 
2177
    def __init__(self, bytes, line_end):
 
2178
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2179
        self._bytes = bytes
 
2180
        self.global_map = {}
 
2181
        self._raw_record_map = {}
 
2182
        self._contents_map = {}
 
2183
        self._record_map = None
 
2184
        self.nonlocal_keys = []
 
2185
        # Get access to record parsing facilities
 
2186
        self.vf = KnitVersionedFiles(None, None)
 
2187
        start = line_end
 
2188
        # Annotated or not
 
2189
        line_end = bytes.find('\n', start)
 
2190
        line = bytes[start:line_end]
 
2191
        start = line_end + 1
 
2192
        if line == 'annotated':
 
2193
            self._factory = KnitAnnotateFactory()
 
2194
        else:
 
2195
            self._factory = KnitPlainFactory()
 
2196
        # list of keys to emit in get_record_stream
 
2197
        line_end = bytes.find('\n', start)
 
2198
        line = bytes[start:line_end]
 
2199
        start = line_end + 1
 
2200
        self.keys = [
 
2201
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2202
            if segment]
 
2203
        # now a loop until the end. XXX: It would be nice if this was just a
 
2204
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2205
        # there is a decent sized gap stopping that at the moment.
 
2206
        end = len(bytes)
 
2207
        while start < end:
 
2208
            # 1 line with key
 
2209
            line_end = bytes.find('\n', start)
 
2210
            key = tuple(bytes[start:line_end].split('\x00'))
 
2211
            start = line_end + 1
 
2212
            # 1 line with parents (None: for None, '' for ())
 
2213
            line_end = bytes.find('\n', start)
 
2214
            line = bytes[start:line_end]
 
2215
            if line == 'None:':
 
2216
                parents = None
 
2217
            else:
 
2218
                parents = tuple(
 
2219
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2220
                     if segment])
 
2221
            self.global_map[key] = parents
 
2222
            start = line_end + 1
 
2223
            # one line with method
 
2224
            line_end = bytes.find('\n', start)
 
2225
            line = bytes[start:line_end]
 
2226
            method = line
 
2227
            start = line_end + 1
 
2228
            # one line with noeol
 
2229
            line_end = bytes.find('\n', start)
 
2230
            line = bytes[start:line_end]
 
2231
            noeol = line == "T"
 
2232
            start = line_end + 1
 
2233
            # one line with next ('' for None)
 
2234
            line_end = bytes.find('\n', start)
 
2235
            line = bytes[start:line_end]
 
2236
            if not line:
 
2237
                next = None
 
2238
            else:
 
2239
                next = tuple(bytes[start:line_end].split('\x00'))
 
2240
            start = line_end + 1
 
2241
            # one line with byte count of the record bytes
 
2242
            line_end = bytes.find('\n', start)
 
2243
            line = bytes[start:line_end]
 
2244
            count = int(line)
 
2245
            start = line_end + 1
 
2246
            # the record bytes
 
2247
            record_bytes = bytes[start:start+count]
 
2248
            start = start + count
 
2249
            # put it in the map
 
2250
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2251
 
 
2252
    def get_record_stream(self):
 
2253
        """Get a record stream for for keys requested by the bytestream."""
 
2254
        first = True
 
2255
        for key in self.keys:
 
2256
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2257
            first = False
 
2258
 
 
2259
    def _wire_bytes(self):
 
2260
        return self._bytes
 
2261
 
 
2262
 
 
2263
class _KndxIndex(object):
 
2264
    """Manages knit index files
 
2265
 
 
2266
    The index is kept in memory and read on startup, to enable
 
2267
    fast lookups of revision information.  The cursor of the index
 
2268
    file is always pointing to the end, making it easy to append
 
2269
    entries.
 
2270
 
 
2271
    _cache is a cache for fast mapping from version id to a Index
 
2272
    object.
 
2273
 
 
2274
    _history is a cache for fast mapping from indexes to version ids.
 
2275
 
 
2276
    The index data format is dictionary compressed when it comes to
 
2277
    parent references; a index entry may only have parents that with a
 
2278
    lover index number.  As a result, the index is topological sorted.
 
2279
 
 
2280
    Duplicate entries may be written to the index for a single version id
 
2281
    if this is done then the latter one completely replaces the former:
 
2282
    this allows updates to correct version and parent information.
 
2283
    Note that the two entries may share the delta, and that successive
 
2284
    annotations and references MUST point to the first entry.
 
2285
 
 
2286
    The index file on disc contains a header, followed by one line per knit
 
2287
    record. The same revision can be present in an index file more than once.
 
2288
    The first occurrence gets assigned a sequence number starting from 0.
 
2289
 
 
2290
    The format of a single line is
 
2291
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
 
2292
    REVISION_ID is a utf8-encoded revision id
 
2293
    FLAGS is a comma separated list of flags about the record. Values include
 
2294
        no-eol, line-delta, fulltext.
 
2295
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
 
2296
        that the the compressed data starts at.
 
2297
    LENGTH is the ascii representation of the length of the data file.
 
2298
    PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
 
2299
        REVISION_ID.
 
2300
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
 
2301
        revision id already in the knit that is a parent of REVISION_ID.
 
2302
    The ' :' marker is the end of record marker.
 
2303
 
 
2304
    partial writes:
 
2305
    when a write is interrupted to the index file, it will result in a line
 
2306
    that does not end in ' :'. If the ' :' is not present at the end of a line,
 
2307
    or at the end of the file, then the record that is missing it will be
 
2308
    ignored by the parser.
 
2309
 
 
2310
    When writing new records to the index file, the data is preceded by '\n'
 
2311
    to ensure that records always start on new lines even if the last write was
 
2312
    interrupted. As a result its normal for the last line in the index to be
 
2313
    missing a trailing newline. One can be added with no harmful effects.
 
2314
 
 
2315
    :ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
 
2316
        where prefix is e.g. the (fileid,) for .texts instances or () for
 
2317
        constant-mapped things like .revisions, and the old state is
 
2318
        tuple(history_vector, cache_dict).  This is used to prevent having an
 
2319
        ABI change with the C extension that reads .kndx files.
 
2320
    """
 
2321
 
 
2322
    HEADER = "# bzr knit index 8\n"
 
2323
 
 
2324
    def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
 
2325
        """Create a _KndxIndex on transport using mapper."""
 
2326
        self._transport = transport
 
2327
        self._mapper = mapper
 
2328
        self._get_scope = get_scope
 
2329
        self._allow_writes = allow_writes
 
2330
        self._is_locked = is_locked
 
2331
        self._reset_cache()
 
2332
        self.has_graph = True
 
2333
 
 
2334
    def add_records(self, records, random_id=False, missing_compression_parents=False):
 
2335
        """Add multiple records to the index.
 
2336
 
 
2337
        :param records: a list of tuples:
 
2338
                         (key, options, access_memo, parents).
 
2339
        :param random_id: If True the ids being added were randomly generated
 
2340
            and no check for existence will be performed.
 
2341
        :param missing_compression_parents: If True the records being added are
 
2342
            only compressed against texts already in the index (or inside
 
2343
            records). If False the records all refer to unavailable texts (or
 
2344
            texts inside records) as compression parents.
 
2345
        """
 
2346
        if missing_compression_parents:
 
2347
            # It might be nice to get the edge of the records. But keys isn't
 
2348
            # _wrong_.
 
2349
            keys = sorted(record[0] for record in records)
 
2350
            raise errors.RevisionNotPresent(keys, self)
 
2351
        paths = {}
 
2352
        for record in records:
 
2353
            key = record[0]
 
2354
            prefix = key[:-1]
 
2355
            path = self._mapper.map(key) + '.kndx'
 
2356
            path_keys = paths.setdefault(path, (prefix, []))
 
2357
            path_keys[1].append(record)
 
2358
        for path in sorted(paths):
 
2359
            prefix, path_keys = paths[path]
 
2360
            self._load_prefixes([prefix])
 
2361
            lines = []
 
2362
            orig_history = self._kndx_cache[prefix][1][:]
 
2363
            orig_cache = self._kndx_cache[prefix][0].copy()
 
2364
 
 
2365
            try:
 
2366
                for key, options, (_, pos, size), parents in path_keys:
 
2367
                    if parents is None:
 
2368
                        # kndx indices cannot be parentless.
 
2369
                        parents = ()
 
2370
                    line = "\n%s %s %s %s %s :" % (
 
2371
                        key[-1], ','.join(options), pos, size,
 
2372
                        self._dictionary_compress(parents))
 
2373
                    if type(line) != str:
 
2374
                        raise AssertionError(
 
2375
                            'data must be utf8 was %s' % type(line))
 
2376
                    lines.append(line)
 
2377
                    self._cache_key(key, options, pos, size, parents)
 
2378
                if len(orig_history):
 
2379
                    self._transport.append_bytes(path, ''.join(lines))
 
2380
                else:
 
2381
                    self._init_index(path, lines)
 
2382
            except:
 
2383
                # If any problems happen, restore the original values and re-raise
 
2384
                self._kndx_cache[prefix] = (orig_cache, orig_history)
 
2385
                raise
 
2386
 
 
2387
    def scan_unvalidated_index(self, graph_index):
 
2388
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2389
        # Because kndx files do not support atomic insertion via separate index
 
2390
        # files, they do not support this method.
 
2391
        raise NotImplementedError(self.scan_unvalidated_index)
 
2392
 
 
2393
    def get_missing_compression_parents(self):
 
2394
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2395
        # Because kndx files do not support atomic insertion via separate index
 
2396
        # files, they do not support this method.
 
2397
        raise NotImplementedError(self.get_missing_compression_parents)
 
2398
 
 
2399
    def _cache_key(self, key, options, pos, size, parent_keys):
 
2400
        """Cache a version record in the history array and index cache.
 
2401
 
 
2402
        This is inlined into _load_data for performance. KEEP IN SYNC.
 
2403
        (It saves 60ms, 25% of the __init__ overhead on local 4000 record
 
2404
         indexes).
 
2405
        """
 
2406
        prefix = key[:-1]
 
2407
        version_id = key[-1]
 
2408
        # last-element only for compatibilty with the C load_data.
 
2409
        parents = tuple(parent[-1] for parent in parent_keys)
 
2410
        for parent in parent_keys:
 
2411
            if parent[:-1] != prefix:
 
2412
                raise ValueError("mismatched prefixes for %r, %r" % (
 
2413
                    key, parent_keys))
 
2414
        cache, history = self._kndx_cache[prefix]
 
2415
        # only want the _history index to reference the 1st index entry
 
2416
        # for version_id
 
2417
        if version_id not in cache:
 
2418
            index = len(history)
 
2419
            history.append(version_id)
 
2420
        else:
 
2421
            index = cache[version_id][5]
 
2422
        cache[version_id] = (version_id,
 
2423
                                   options,
 
2424
                                   pos,
 
2425
                                   size,
 
2426
                                   parents,
 
2427
                                   index)
 
2428
 
 
2429
    def check_header(self, fp):
 
2430
        line = fp.readline()
 
2431
        if line == '':
 
2432
            # An empty file can actually be treated as though the file doesn't
 
2433
            # exist yet.
 
2434
            raise errors.NoSuchFile(self)
 
2435
        if line != self.HEADER:
 
2436
            raise KnitHeaderError(badline=line, filename=self)
 
2437
 
 
2438
    def _check_read(self):
 
2439
        if not self._is_locked():
 
2440
            raise errors.ObjectNotLocked(self)
 
2441
        if self._get_scope() != self._scope:
 
2442
            self._reset_cache()
 
2443
 
 
2444
    def _check_write_ok(self):
 
2445
        """Assert if not writes are permitted."""
 
2446
        if not self._is_locked():
 
2447
            raise errors.ObjectNotLocked(self)
 
2448
        if self._get_scope() != self._scope:
 
2449
            self._reset_cache()
 
2450
        if self._mode != 'w':
 
2451
            raise errors.ReadOnlyObjectDirtiedError(self)
 
2452
 
 
2453
    def get_build_details(self, keys):
 
2454
        """Get the method, index_memo and compression parent for keys.
 
2455
 
 
2456
        Ghosts are omitted from the result.
 
2457
 
 
2458
        :param keys: An iterable of keys.
 
2459
        :return: A dict of key:(index_memo, compression_parent, parents,
 
2460
            record_details).
 
2461
            index_memo
 
2462
                opaque structure to pass to read_records to extract the raw
 
2463
                data
 
2464
            compression_parent
 
2465
                Content that this record is built upon, may be None
 
2466
            parents
 
2467
                Logical parents of this node
 
2468
            record_details
 
2469
                extra information about the content which needs to be passed to
 
2470
                Factory.parse_record
 
2471
        """
 
2472
        parent_map = self.get_parent_map(keys)
 
2473
        result = {}
 
2474
        for key in keys:
 
2475
            if key not in parent_map:
 
2476
                continue # Ghost
 
2477
            method = self.get_method(key)
 
2478
            parents = parent_map[key]
 
2479
            if method == 'fulltext':
 
2480
                compression_parent = None
 
2481
            else:
 
2482
                compression_parent = parents[0]
 
2483
            noeol = 'no-eol' in self.get_options(key)
 
2484
            index_memo = self.get_position(key)
 
2485
            result[key] = (index_memo, compression_parent,
 
2486
                                  parents, (method, noeol))
 
2487
        return result
 
2488
 
 
2489
    def get_method(self, key):
 
2490
        """Return compression method of specified key."""
 
2491
        options = self.get_options(key)
 
2492
        if 'fulltext' in options:
 
2493
            return 'fulltext'
 
2494
        elif 'line-delta' in options:
 
2495
            return 'line-delta'
 
2496
        else:
 
2497
            raise errors.KnitIndexUnknownMethod(self, options)
 
2498
 
 
2499
    def get_options(self, key):
 
2500
        """Return a list representing options.
 
2501
 
 
2502
        e.g. ['foo', 'bar']
 
2503
        """
 
2504
        prefix, suffix = self._split_key(key)
 
2505
        self._load_prefixes([prefix])
 
2506
        try:
 
2507
            return self._kndx_cache[prefix][0][suffix][1]
 
2508
        except KeyError:
 
2509
            raise RevisionNotPresent(key, self)
 
2510
 
 
2511
    def get_parent_map(self, keys):
 
2512
        """Get a map of the parents of keys.
 
2513
 
 
2514
        :param keys: The keys to look up parents for.
 
2515
        :return: A mapping from keys to parents. Absent keys are absent from
 
2516
            the mapping.
 
2517
        """
 
2518
        # Parse what we need to up front, this potentially trades off I/O
 
2519
        # locality (.kndx and .knit in the same block group for the same file
 
2520
        # id) for less checking in inner loops.
 
2521
        prefixes = set(key[:-1] for key in keys)
 
2522
        self._load_prefixes(prefixes)
 
2523
        result = {}
 
2524
        for key in keys:
 
2525
            prefix = key[:-1]
 
2526
            try:
 
2527
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2528
            except KeyError:
 
2529
                pass
 
2530
            else:
 
2531
                result[key] = tuple(prefix + (suffix,) for
 
2532
                    suffix in suffix_parents)
 
2533
        return result
 
2534
 
 
2535
    def get_position(self, key):
 
2536
        """Return details needed to access the version.
 
2537
 
 
2538
        :return: a tuple (key, data position, size) to hand to the access
 
2539
            logic to get the record.
 
2540
        """
 
2541
        prefix, suffix = self._split_key(key)
 
2542
        self._load_prefixes([prefix])
 
2543
        entry = self._kndx_cache[prefix][0][suffix]
 
2544
        return key, entry[2], entry[3]
 
2545
 
 
2546
    has_key = _mod_index._has_key_from_parent_map
 
2547
 
 
2548
    def _init_index(self, path, extra_lines=[]):
 
2549
        """Initialize an index."""
 
2550
        sio = StringIO()
 
2551
        sio.write(self.HEADER)
 
2552
        sio.writelines(extra_lines)
 
2553
        sio.seek(0)
 
2554
        self._transport.put_file_non_atomic(path, sio,
 
2555
                            create_parent_dir=True)
 
2556
                           # self._create_parent_dir)
 
2557
                           # mode=self._file_mode,
 
2558
                           # dir_mode=self._dir_mode)
 
2559
 
 
2560
    def keys(self):
 
2561
        """Get all the keys in the collection.
 
2562
 
 
2563
        The keys are not ordered.
 
2564
        """
 
2565
        result = set()
 
2566
        # Identify all key prefixes.
 
2567
        # XXX: A bit hacky, needs polish.
 
2568
        if type(self._mapper) == ConstantMapper:
 
2569
            prefixes = [()]
 
2570
        else:
 
2571
            relpaths = set()
 
2572
            for quoted_relpath in self._transport.iter_files_recursive():
 
2573
                path, ext = os.path.splitext(quoted_relpath)
 
2574
                relpaths.add(path)
 
2575
            prefixes = [self._mapper.unmap(path) for path in relpaths]
 
2576
        self._load_prefixes(prefixes)
 
2577
        for prefix in prefixes:
 
2578
            for suffix in self._kndx_cache[prefix][1]:
 
2579
                result.add(prefix + (suffix,))
 
2580
        return result
 
2581
 
 
2582
    def _load_prefixes(self, prefixes):
 
2583
        """Load the indices for prefixes."""
 
2584
        self._check_read()
 
2585
        for prefix in prefixes:
 
2586
            if prefix not in self._kndx_cache:
 
2587
                # the load_data interface writes to these variables.
 
2588
                self._cache = {}
 
2589
                self._history = []
 
2590
                self._filename = prefix
 
2591
                try:
 
2592
                    path = self._mapper.map(prefix) + '.kndx'
 
2593
                    fp = self._transport.get(path)
 
2594
                    try:
 
2595
                        # _load_data may raise NoSuchFile if the target knit is
 
2596
                        # completely empty.
 
2597
                        _load_data(self, fp)
 
2598
                    finally:
 
2599
                        fp.close()
 
2600
                    self._kndx_cache[prefix] = (self._cache, self._history)
 
2601
                    del self._cache
 
2602
                    del self._filename
 
2603
                    del self._history
 
2604
                except NoSuchFile:
 
2605
                    self._kndx_cache[prefix] = ({}, [])
 
2606
                    if type(self._mapper) == ConstantMapper:
 
2607
                        # preserve behaviour for revisions.kndx etc.
 
2608
                        self._init_index(path)
 
2609
                    del self._cache
 
2610
                    del self._filename
 
2611
                    del self._history
 
2612
 
 
2613
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2614
 
 
2615
    def _partition_keys(self, keys):
 
2616
        """Turn keys into a dict of prefix:suffix_list."""
 
2617
        result = {}
 
2618
        for key in keys:
 
2619
            prefix_keys = result.setdefault(key[:-1], [])
 
2620
            prefix_keys.append(key[-1])
 
2621
        return result
 
2622
 
 
2623
    def _dictionary_compress(self, keys):
 
2624
        """Dictionary compress keys.
 
2625
 
 
2626
        :param keys: The keys to generate references to.
 
2627
        :return: A string representation of keys. keys which are present are
 
2628
            dictionary compressed, and others are emitted as fulltext with a
 
2629
            '.' prefix.
 
2630
        """
 
2631
        if not keys:
 
2632
            return ''
 
2633
        result_list = []
 
2634
        prefix = keys[0][:-1]
 
2635
        cache = self._kndx_cache[prefix][0]
 
2636
        for key in keys:
 
2637
            if key[:-1] != prefix:
 
2638
                # kndx indices cannot refer across partitioned storage.
 
2639
                raise ValueError("mismatched prefixes for %r" % keys)
 
2640
            if key[-1] in cache:
 
2641
                # -- inlined lookup() --
 
2642
                result_list.append(str(cache[key[-1]][5]))
 
2643
                # -- end lookup () --
 
2644
            else:
 
2645
                result_list.append('.' + key[-1])
 
2646
        return ' '.join(result_list)
 
2647
 
 
2648
    def _reset_cache(self):
 
2649
        # Possibly this should be a LRU cache. A dictionary from key_prefix to
 
2650
        # (cache_dict, history_vector) for parsed kndx files.
 
2651
        self._kndx_cache = {}
 
2652
        self._scope = self._get_scope()
 
2653
        allow_writes = self._allow_writes()
 
2654
        if allow_writes:
 
2655
            self._mode = 'w'
 
2656
        else:
 
2657
            self._mode = 'r'
 
2658
 
 
2659
    def _sort_keys_by_io(self, keys, positions):
 
2660
        """Figure out an optimal order to read the records for the given keys.
 
2661
 
 
2662
        Sort keys, grouped by index and sorted by position.
 
2663
 
 
2664
        :param keys: A list of keys whose records we want to read. This will be
 
2665
            sorted 'in-place'.
 
2666
        :param positions: A dict, such as the one returned by
 
2667
            _get_components_positions()
 
2668
        :return: None
 
2669
        """
 
2670
        def get_sort_key(key):
 
2671
            index_memo = positions[key][1]
 
2672
            # Group by prefix and position. index_memo[0] is the key, so it is
 
2673
            # (file_id, revision_id) and we don't want to sort on revision_id,
 
2674
            # index_memo[1] is the position, and index_memo[2] is the size,
 
2675
            # which doesn't matter for the sort
 
2676
            return index_memo[0][:-1], index_memo[1]
 
2677
        return keys.sort(key=get_sort_key)
 
2678
 
 
2679
    _get_total_build_size = _get_total_build_size
 
2680
 
 
2681
    def _split_key(self, key):
 
2682
        """Split key into a prefix and suffix."""
 
2683
        return key[:-1], key[-1]
 
2684
 
 
2685
 
 
2686
class _KnitGraphIndex(object):
 
2687
    """A KnitVersionedFiles index layered on GraphIndex."""
 
2688
 
 
2689
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
 
2690
        add_callback=None):
 
2691
        """Construct a KnitGraphIndex on a graph_index.
 
2692
 
 
2693
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
 
2694
        :param is_locked: A callback to check whether the object should answer
 
2695
            queries.
 
2696
        :param deltas: Allow delta-compressed records.
 
2697
        :param parents: If True, record knits parents, if not do not record
 
2698
            parents.
 
2699
        :param add_callback: If not None, allow additions to the index and call
 
2700
            this callback with a list of added GraphIndex nodes:
 
2701
            [(node, value, node_refs), ...]
 
2702
        :param is_locked: A callback, returns True if the index is locked and
 
2703
            thus usable.
 
2704
        """
 
2705
        self._add_callback = add_callback
 
2706
        self._graph_index = graph_index
 
2707
        self._deltas = deltas
 
2708
        self._parents = parents
 
2709
        if deltas and not parents:
 
2710
            # XXX: TODO: Delta tree and parent graph should be conceptually
 
2711
            # separate.
 
2712
            raise KnitCorrupt(self, "Cannot do delta compression without "
 
2713
                "parent tracking.")
 
2714
        self.has_graph = parents
 
2715
        self._is_locked = is_locked
 
2716
        self._missing_compression_parents = set()
 
2717
 
 
2718
    def __repr__(self):
 
2719
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
 
2720
 
 
2721
    def add_records(self, records, random_id=False,
 
2722
        missing_compression_parents=False):
 
2723
        """Add multiple records to the index.
 
2724
 
 
2725
        This function does not insert data into the Immutable GraphIndex
 
2726
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
2727
        the caller and checks that it is safe to insert then calls
 
2728
        self._add_callback with the prepared GraphIndex nodes.
 
2729
 
 
2730
        :param records: a list of tuples:
 
2731
                         (key, options, access_memo, parents).
 
2732
        :param random_id: If True the ids being added were randomly generated
 
2733
            and no check for existence will be performed.
 
2734
        :param missing_compression_parents: If True the records being added are
 
2735
            only compressed against texts already in the index (or inside
 
2736
            records). If False the records all refer to unavailable texts (or
 
2737
            texts inside records) as compression parents.
 
2738
        """
 
2739
        if not self._add_callback:
 
2740
            raise errors.ReadOnlyError(self)
 
2741
        # we hope there are no repositories with inconsistent parentage
 
2742
        # anymore.
 
2743
 
 
2744
        keys = {}
 
2745
        compression_parents = set()
 
2746
        for (key, options, access_memo, parents) in records:
 
2747
            if self._parents:
 
2748
                parents = tuple(parents)
 
2749
            index, pos, size = access_memo
 
2750
            if 'no-eol' in options:
 
2751
                value = 'N'
 
2752
            else:
 
2753
                value = ' '
 
2754
            value += "%d %d" % (pos, size)
 
2755
            if not self._deltas:
 
2756
                if 'line-delta' in options:
 
2757
                    raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
 
2758
            if self._parents:
 
2759
                if self._deltas:
 
2760
                    if 'line-delta' in options:
 
2761
                        node_refs = (parents, (parents[0],))
 
2762
                        if missing_compression_parents:
 
2763
                            compression_parents.add(parents[0])
 
2764
                    else:
 
2765
                        node_refs = (parents, ())
 
2766
                else:
 
2767
                    node_refs = (parents, )
 
2768
            else:
 
2769
                if parents:
 
2770
                    raise KnitCorrupt(self, "attempt to add node with parents "
 
2771
                        "in parentless index.")
 
2772
                node_refs = ()
 
2773
            keys[key] = (value, node_refs)
 
2774
        # check for dups
 
2775
        if not random_id:
 
2776
            present_nodes = self._get_entries(keys)
 
2777
            for (index, key, value, node_refs) in present_nodes:
 
2778
                if (value[0] != keys[key][0][0] or
 
2779
                    node_refs[:1] != keys[key][1][:1]):
 
2780
                    raise KnitCorrupt(self, "inconsistent details in add_records"
 
2781
                        ": %s %s" % ((value, node_refs), keys[key]))
 
2782
                del keys[key]
 
2783
        result = []
 
2784
        if self._parents:
 
2785
            for key, (value, node_refs) in keys.iteritems():
 
2786
                result.append((key, value, node_refs))
 
2787
        else:
 
2788
            for key, (value, node_refs) in keys.iteritems():
 
2789
                result.append((key, value))
 
2790
        self._add_callback(result)
 
2791
        if missing_compression_parents:
 
2792
            # This may appear to be incorrect (it does not check for
 
2793
            # compression parents that are in the existing graph index),
 
2794
            # but such records won't have been buffered, so this is
 
2795
            # actually correct: every entry when
 
2796
            # missing_compression_parents==True either has a missing parent, or
 
2797
            # a parent that is one of the keys in records.
 
2798
            compression_parents.difference_update(keys)
 
2799
            self._missing_compression_parents.update(compression_parents)
 
2800
        # Adding records may have satisfied missing compression parents.
 
2801
        self._missing_compression_parents.difference_update(keys)
 
2802
 
 
2803
    def scan_unvalidated_index(self, graph_index):
 
2804
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2805
 
 
2806
        This allows this _KnitGraphIndex to keep track of any missing
 
2807
        compression parents we may want to have filled in to make those
 
2808
        indices valid.
 
2809
 
 
2810
        :param graph_index: A GraphIndex
 
2811
        """
 
2812
        if self._deltas:
 
2813
            new_missing = graph_index.external_references(ref_list_num=1)
 
2814
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2815
            self._missing_compression_parents.update(new_missing)
 
2816
 
 
2817
    def get_missing_compression_parents(self):
 
2818
        """Return the keys of missing compression parents.
 
2819
 
 
2820
        Missing compression parents occur when a record stream was missing
 
2821
        basis texts, or a index was scanned that had missing basis texts.
 
2822
        """
 
2823
        return frozenset(self._missing_compression_parents)
 
2824
 
 
2825
    def _check_read(self):
 
2826
        """raise if reads are not permitted."""
 
2827
        if not self._is_locked():
 
2828
            raise errors.ObjectNotLocked(self)
 
2829
 
 
2830
    def _check_write_ok(self):
 
2831
        """Assert if writes are not permitted."""
 
2832
        if not self._is_locked():
 
2833
            raise errors.ObjectNotLocked(self)
 
2834
 
 
2835
    def _compression_parent(self, an_entry):
 
2836
        # return the key that an_entry is compressed against, or None
 
2837
        # Grab the second parent list (as deltas implies parents currently)
 
2838
        compression_parents = an_entry[3][1]
 
2839
        if not compression_parents:
 
2840
            return None
 
2841
        if len(compression_parents) != 1:
 
2842
            raise AssertionError(
 
2843
                "Too many compression parents: %r" % compression_parents)
 
2844
        return compression_parents[0]
 
2845
 
 
2846
    def get_build_details(self, keys):
 
2847
        """Get the method, index_memo and compression parent for version_ids.
 
2848
 
 
2849
        Ghosts are omitted from the result.
 
2850
 
 
2851
        :param keys: An iterable of keys.
 
2852
        :return: A dict of key:
 
2853
            (index_memo, compression_parent, parents, record_details).
 
2854
            index_memo
 
2855
                opaque structure to pass to read_records to extract the raw
 
2856
                data
 
2857
            compression_parent
 
2858
                Content that this record is built upon, may be None
 
2859
            parents
 
2860
                Logical parents of this node
 
2861
            record_details
 
2862
                extra information about the content which needs to be passed to
 
2863
                Factory.parse_record
 
2864
        """
 
2865
        self._check_read()
 
2866
        result = {}
 
2867
        entries = self._get_entries(keys, False)
 
2868
        for entry in entries:
 
2869
            key = entry[1]
 
2870
            if not self._parents:
 
2871
                parents = ()
 
2872
            else:
 
2873
                parents = entry[3][0]
 
2874
            if not self._deltas:
 
2875
                compression_parent_key = None
 
2876
            else:
 
2877
                compression_parent_key = self._compression_parent(entry)
 
2878
            noeol = (entry[2][0] == 'N')
 
2879
            if compression_parent_key:
 
2880
                method = 'line-delta'
 
2881
            else:
 
2882
                method = 'fulltext'
 
2883
            result[key] = (self._node_to_position(entry),
 
2884
                                  compression_parent_key, parents,
 
2885
                                  (method, noeol))
 
2886
        return result
 
2887
 
 
2888
    def _get_entries(self, keys, check_present=False):
 
2889
        """Get the entries for keys.
 
2890
 
 
2891
        :param keys: An iterable of index key tuples.
 
2892
        """
 
2893
        keys = set(keys)
 
2894
        found_keys = set()
 
2895
        if self._parents:
 
2896
            for node in self._graph_index.iter_entries(keys):
 
2897
                yield node
 
2898
                found_keys.add(node[1])
 
2899
        else:
 
2900
            # adapt parentless index to the rest of the code.
 
2901
            for node in self._graph_index.iter_entries(keys):
 
2902
                yield node[0], node[1], node[2], ()
 
2903
                found_keys.add(node[1])
 
2904
        if check_present:
 
2905
            missing_keys = keys.difference(found_keys)
 
2906
            if missing_keys:
 
2907
                raise RevisionNotPresent(missing_keys.pop(), self)
 
2908
 
 
2909
    def get_method(self, key):
 
2910
        """Return compression method of specified key."""
 
2911
        return self._get_method(self._get_node(key))
 
2912
 
 
2913
    def _get_method(self, node):
 
2914
        if not self._deltas:
 
2915
            return 'fulltext'
 
2916
        if self._compression_parent(node):
 
2917
            return 'line-delta'
 
2918
        else:
 
2919
            return 'fulltext'
 
2920
 
 
2921
    def _get_node(self, key):
 
2922
        try:
 
2923
            return list(self._get_entries([key]))[0]
 
2924
        except IndexError:
 
2925
            raise RevisionNotPresent(key, self)
 
2926
 
 
2927
    def get_options(self, key):
 
2928
        """Return a list representing options.
 
2929
 
 
2930
        e.g. ['foo', 'bar']
 
2931
        """
 
2932
        node = self._get_node(key)
 
2933
        options = [self._get_method(node)]
 
2934
        if node[2][0] == 'N':
 
2935
            options.append('no-eol')
 
2936
        return options
 
2937
 
 
2938
    def get_parent_map(self, keys):
 
2939
        """Get a map of the parents of keys.
 
2940
 
 
2941
        :param keys: The keys to look up parents for.
 
2942
        :return: A mapping from keys to parents. Absent keys are absent from
 
2943
            the mapping.
 
2944
        """
 
2945
        self._check_read()
 
2946
        nodes = self._get_entries(keys)
 
2947
        result = {}
 
2948
        if self._parents:
 
2949
            for node in nodes:
 
2950
                result[node[1]] = node[3][0]
 
2951
        else:
 
2952
            for node in nodes:
 
2953
                result[node[1]] = None
 
2954
        return result
 
2955
 
 
2956
    def get_position(self, key):
 
2957
        """Return details needed to access the version.
 
2958
 
 
2959
        :return: a tuple (index, data position, size) to hand to the access
 
2960
            logic to get the record.
 
2961
        """
 
2962
        node = self._get_node(key)
 
2963
        return self._node_to_position(node)
 
2964
 
 
2965
    has_key = _mod_index._has_key_from_parent_map
 
2966
 
 
2967
    def keys(self):
 
2968
        """Get all the keys in the collection.
 
2969
 
 
2970
        The keys are not ordered.
 
2971
        """
 
2972
        self._check_read()
 
2973
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
2974
 
 
2975
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2976
 
 
2977
    def _node_to_position(self, node):
 
2978
        """Convert an index value to position details."""
 
2979
        bits = node[2][1:].split(' ')
 
2980
        return node[0], int(bits[0]), int(bits[1])
 
2981
 
 
2982
    def _sort_keys_by_io(self, keys, positions):
 
2983
        """Figure out an optimal order to read the records for the given keys.
 
2984
 
 
2985
        Sort keys, grouped by index and sorted by position.
 
2986
 
 
2987
        :param keys: A list of keys whose records we want to read. This will be
 
2988
            sorted 'in-place'.
 
2989
        :param positions: A dict, such as the one returned by
 
2990
            _get_components_positions()
 
2991
        :return: None
 
2992
        """
 
2993
        def get_index_memo(key):
 
2994
            # index_memo is at offset [1]. It is made up of (GraphIndex,
 
2995
            # position, size). GI is an object, which will be unique for each
 
2996
            # pack file. This causes us to group by pack file, then sort by
 
2997
            # position. Size doesn't matter, but it isn't worth breaking up the
 
2998
            # tuple.
 
2999
            return positions[key][1]
 
3000
        return keys.sort(key=get_index_memo)
 
3001
 
 
3002
    _get_total_build_size = _get_total_build_size
 
3003
 
 
3004
 
 
3005
class _KnitKeyAccess(object):
 
3006
    """Access to records in .knit files."""
 
3007
 
 
3008
    def __init__(self, transport, mapper):
 
3009
        """Create a _KnitKeyAccess with transport and mapper.
 
3010
 
 
3011
        :param transport: The transport the access object is rooted at.
 
3012
        :param mapper: The mapper used to map keys to .knit files.
 
3013
        """
 
3014
        self._transport = transport
 
3015
        self._mapper = mapper
 
3016
 
 
3017
    def add_raw_records(self, key_sizes, raw_data):
 
3018
        """Add raw knit bytes to a storage area.
 
3019
 
 
3020
        The data is spooled to the container writer in one bytes-record per
 
3021
        raw data item.
 
3022
 
 
3023
        :param sizes: An iterable of tuples containing the key and size of each
 
3024
            raw data segment.
 
3025
        :param raw_data: A bytestring containing the data.
 
3026
        :return: A list of memos to retrieve the record later. Each memo is an
 
3027
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
 
3028
            length), where the key is the record key.
 
3029
        """
 
3030
        if type(raw_data) != str:
 
3031
            raise AssertionError(
 
3032
                'data must be plain bytes was %s' % type(raw_data))
 
3033
        result = []
 
3034
        offset = 0
 
3035
        # TODO: This can be tuned for writing to sftp and other servers where
 
3036
        # append() is relatively expensive by grouping the writes to each key
 
3037
        # prefix.
 
3038
        for key, size in key_sizes:
 
3039
            path = self._mapper.map(key)
 
3040
            try:
 
3041
                base = self._transport.append_bytes(path + '.knit',
 
3042
                    raw_data[offset:offset+size])
 
3043
            except errors.NoSuchFile:
 
3044
                self._transport.mkdir(osutils.dirname(path))
 
3045
                base = self._transport.append_bytes(path + '.knit',
 
3046
                    raw_data[offset:offset+size])
 
3047
            # if base == 0:
 
3048
            # chmod.
 
3049
            offset += size
 
3050
            result.append((key, base, size))
 
3051
        return result
 
3052
 
 
3053
    def flush(self):
 
3054
        """Flush pending writes on this access object.
 
3055
        
 
3056
        For .knit files this is a no-op.
 
3057
        """
 
3058
        pass
 
3059
 
 
3060
    def get_raw_records(self, memos_for_retrieval):
 
3061
        """Get the raw bytes for a records.
 
3062
 
 
3063
        :param memos_for_retrieval: An iterable containing the access memo for
 
3064
            retrieving the bytes.
 
3065
        :return: An iterator over the bytes of the records.
 
3066
        """
 
3067
        # first pass, group into same-index request to minimise readv's issued.
 
3068
        request_lists = []
 
3069
        current_prefix = None
 
3070
        for (key, offset, length) in memos_for_retrieval:
 
3071
            if current_prefix == key[:-1]:
 
3072
                current_list.append((offset, length))
 
3073
            else:
 
3074
                if current_prefix is not None:
 
3075
                    request_lists.append((current_prefix, current_list))
 
3076
                current_prefix = key[:-1]
 
3077
                current_list = [(offset, length)]
 
3078
        # handle the last entry
 
3079
        if current_prefix is not None:
 
3080
            request_lists.append((current_prefix, current_list))
 
3081
        for prefix, read_vector in request_lists:
 
3082
            path = self._mapper.map(prefix) + '.knit'
 
3083
            for pos, data in self._transport.readv(path, read_vector):
 
3084
                yield data
 
3085
 
 
3086
 
 
3087
class _DirectPackAccess(object):
 
3088
    """Access to data in one or more packs with less translation."""
 
3089
 
 
3090
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
 
3091
        """Create a _DirectPackAccess object.
 
3092
 
 
3093
        :param index_to_packs: A dict mapping index objects to the transport
 
3094
            and file names for obtaining data.
 
3095
        :param reload_func: A function to call if we determine that the pack
 
3096
            files have moved and we need to reload our caches. See
 
3097
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
 
3098
        """
 
3099
        self._container_writer = None
 
3100
        self._write_index = None
 
3101
        self._indices = index_to_packs
 
3102
        self._reload_func = reload_func
 
3103
        self._flush_func = flush_func
 
3104
 
 
3105
    def add_raw_records(self, key_sizes, raw_data):
 
3106
        """Add raw knit bytes to a storage area.
 
3107
 
 
3108
        The data is spooled to the container writer in one bytes-record per
 
3109
        raw data item.
 
3110
 
 
3111
        :param sizes: An iterable of tuples containing the key and size of each
 
3112
            raw data segment.
 
3113
        :param raw_data: A bytestring containing the data.
 
3114
        :return: A list of memos to retrieve the record later. Each memo is an
 
3115
            opaque index memo. For _DirectPackAccess the memo is (index, pos,
 
3116
            length), where the index field is the write_index object supplied
 
3117
            to the PackAccess object.
 
3118
        """
 
3119
        if type(raw_data) != str:
 
3120
            raise AssertionError(
 
3121
                'data must be plain bytes was %s' % type(raw_data))
 
3122
        result = []
 
3123
        offset = 0
 
3124
        for key, size in key_sizes:
 
3125
            p_offset, p_length = self._container_writer.add_bytes_record(
 
3126
                raw_data[offset:offset+size], [])
 
3127
            offset += size
 
3128
            result.append((self._write_index, p_offset, p_length))
 
3129
        return result
 
3130
 
 
3131
    def flush(self):
 
3132
        """Flush pending writes on this access object.
 
3133
 
 
3134
        This will flush any buffered writes to a NewPack.
 
3135
        """
 
3136
        if self._flush_func is not None:
 
3137
            self._flush_func()
 
3138
            
 
3139
    def get_raw_records(self, memos_for_retrieval):
 
3140
        """Get the raw bytes for a records.
 
3141
 
 
3142
        :param memos_for_retrieval: An iterable containing the (index, pos,
 
3143
            length) memo for retrieving the bytes. The Pack access method
 
3144
            looks up the pack to use for a given record in its index_to_pack
 
3145
            map.
 
3146
        :return: An iterator over the bytes of the records.
 
3147
        """
 
3148
        # first pass, group into same-index requests
 
3149
        request_lists = []
 
3150
        current_index = None
 
3151
        for (index, offset, length) in memos_for_retrieval:
 
3152
            if current_index == index:
 
3153
                current_list.append((offset, length))
 
3154
            else:
 
3155
                if current_index is not None:
 
3156
                    request_lists.append((current_index, current_list))
 
3157
                current_index = index
 
3158
                current_list = [(offset, length)]
 
3159
        # handle the last entry
 
3160
        if current_index is not None:
 
3161
            request_lists.append((current_index, current_list))
 
3162
        for index, offsets in request_lists:
 
3163
            try:
 
3164
                transport, path = self._indices[index]
 
3165
            except KeyError:
 
3166
                # A KeyError here indicates that someone has triggered an index
 
3167
                # reload, and this index has gone missing, we need to start
 
3168
                # over.
 
3169
                if self._reload_func is None:
 
3170
                    # If we don't have a _reload_func there is nothing that can
 
3171
                    # be done
 
3172
                    raise
 
3173
                raise errors.RetryWithNewPacks(index,
 
3174
                                               reload_occurred=True,
 
3175
                                               exc_info=sys.exc_info())
 
3176
            try:
 
3177
                reader = pack.make_readv_reader(transport, path, offsets)
 
3178
                for names, read_func in reader.iter_records():
 
3179
                    yield read_func(None)
 
3180
            except errors.NoSuchFile:
 
3181
                # A NoSuchFile error indicates that a pack file has gone
 
3182
                # missing on disk, we need to trigger a reload, and start over.
 
3183
                if self._reload_func is None:
 
3184
                    raise
 
3185
                raise errors.RetryWithNewPacks(transport.abspath(path),
 
3186
                                               reload_occurred=False,
 
3187
                                               exc_info=sys.exc_info())
 
3188
 
 
3189
    def set_writer(self, writer, index, transport_packname):
 
3190
        """Set a writer to use for adding data."""
 
3191
        if index is not None:
 
3192
            self._indices[index] = transport_packname
 
3193
        self._container_writer = writer
 
3194
        self._write_index = index
 
3195
 
 
3196
    def reload_or_raise(self, retry_exc):
 
3197
        """Try calling the reload function, or re-raise the original exception.
 
3198
 
 
3199
        This should be called after _DirectPackAccess raises a
 
3200
        RetryWithNewPacks exception. This function will handle the common logic
 
3201
        of determining when the error is fatal versus being temporary.
 
3202
        It will also make sure that the original exception is raised, rather
 
3203
        than the RetryWithNewPacks exception.
 
3204
 
 
3205
        If this function returns, then the calling function should retry
 
3206
        whatever operation was being performed. Otherwise an exception will
 
3207
        be raised.
 
3208
 
 
3209
        :param retry_exc: A RetryWithNewPacks exception.
 
3210
        """
 
3211
        is_error = False
 
3212
        if self._reload_func is None:
 
3213
            is_error = True
 
3214
        elif not self._reload_func():
 
3215
            # The reload claimed that nothing changed
 
3216
            if not retry_exc.reload_occurred:
 
3217
                # If there wasn't an earlier reload, then we really were
 
3218
                # expecting to find changes. We didn't find them, so this is a
 
3219
                # hard error
 
3220
                is_error = True
 
3221
        if is_error:
 
3222
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
3223
            raise exc_class, exc_value, exc_traceback
 
3224
 
 
3225
 
 
3226
# Deprecated, use PatienceSequenceMatcher instead
 
3227
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
 
3228
 
 
3229
 
 
3230
def annotate_knit(knit, revision_id):
 
3231
    """Annotate a knit with no cached annotations.
 
3232
 
 
3233
    This implementation is for knits with no cached annotations.
 
3234
    It will work for knits with cached annotations, but this is not
 
3235
    recommended.
 
3236
    """
 
3237
    annotator = _KnitAnnotator(knit)
 
3238
    return iter(annotator.annotate(revision_id))
 
3239
 
 
3240
 
 
3241
class _KnitAnnotator(object):
 
3242
    """Build up the annotations for a text."""
 
3243
 
 
3244
    def __init__(self, knit):
 
3245
        self._knit = knit
 
3246
 
 
3247
        # Content objects, differs from fulltexts because of how final newlines
 
3248
        # are treated by knits. the content objects here will always have a
 
3249
        # final newline
 
3250
        self._fulltext_contents = {}
 
3251
 
 
3252
        # Annotated lines of specific revisions
 
3253
        self._annotated_lines = {}
 
3254
 
 
3255
        # Track the raw data for nodes that we could not process yet.
 
3256
        # This maps the revision_id of the base to a list of children that will
 
3257
        # annotated from it.
 
3258
        self._pending_children = {}
 
3259
 
 
3260
        # Nodes which cannot be extracted
 
3261
        self._ghosts = set()
 
3262
 
 
3263
        # Track how many children this node has, so we know if we need to keep
 
3264
        # it
 
3265
        self._annotate_children = {}
 
3266
        self._compression_children = {}
 
3267
 
 
3268
        self._all_build_details = {}
 
3269
        # The children => parent revision_id graph
 
3270
        self._revision_id_graph = {}
 
3271
 
 
3272
        self._heads_provider = None
 
3273
 
 
3274
        self._nodes_to_keep_annotations = set()
 
3275
        self._generations_until_keep = 100
 
3276
 
 
3277
    def set_generations_until_keep(self, value):
 
3278
        """Set the number of generations before caching a node.
 
3279
 
 
3280
        Setting this to -1 will cache every merge node, setting this higher
 
3281
        will cache fewer nodes.
 
3282
        """
 
3283
        self._generations_until_keep = value
 
3284
 
 
3285
    def _add_fulltext_content(self, revision_id, content_obj):
 
3286
        self._fulltext_contents[revision_id] = content_obj
 
3287
        # TODO: jam 20080305 It might be good to check the sha1digest here
 
3288
        return content_obj.text()
 
3289
 
 
3290
    def _check_parents(self, child, nodes_to_annotate):
 
3291
        """Check if all parents have been processed.
 
3292
 
 
3293
        :param child: A tuple of (rev_id, parents, raw_content)
 
3294
        :param nodes_to_annotate: If child is ready, add it to
 
3295
            nodes_to_annotate, otherwise put it back in self._pending_children
 
3296
        """
 
3297
        for parent_id in child[1]:
 
3298
            if (parent_id not in self._annotated_lines):
 
3299
                # This parent is present, but another parent is missing
 
3300
                self._pending_children.setdefault(parent_id,
 
3301
                                                  []).append(child)
 
3302
                break
 
3303
        else:
 
3304
            # This one is ready to be processed
 
3305
            nodes_to_annotate.append(child)
 
3306
 
 
3307
    def _add_annotation(self, revision_id, fulltext, parent_ids,
 
3308
                        left_matching_blocks=None):
 
3309
        """Add an annotation entry.
 
3310
 
 
3311
        All parents should already have been annotated.
 
3312
        :return: A list of children that now have their parents satisfied.
 
3313
        """
 
3314
        a = self._annotated_lines
 
3315
        annotated_parent_lines = [a[p] for p in parent_ids]
 
3316
        annotated_lines = list(annotate.reannotate(annotated_parent_lines,
 
3317
            fulltext, revision_id, left_matching_blocks,
 
3318
            heads_provider=self._get_heads_provider()))
 
3319
        self._annotated_lines[revision_id] = annotated_lines
 
3320
        for p in parent_ids:
 
3321
            ann_children = self._annotate_children[p]
 
3322
            ann_children.remove(revision_id)
 
3323
            if (not ann_children
 
3324
                and p not in self._nodes_to_keep_annotations):
 
3325
                del self._annotated_lines[p]
 
3326
                del self._all_build_details[p]
 
3327
                if p in self._fulltext_contents:
 
3328
                    del self._fulltext_contents[p]
 
3329
        # Now that we've added this one, see if there are any pending
 
3330
        # deltas to be done, certainly this parent is finished
 
3331
        nodes_to_annotate = []
 
3332
        for child in self._pending_children.pop(revision_id, []):
 
3333
            self._check_parents(child, nodes_to_annotate)
 
3334
        return nodes_to_annotate
 
3335
 
 
3336
    def _get_build_graph(self, key):
 
3337
        """Get the graphs for building texts and annotations.
 
3338
 
 
3339
        The data you need for creating a full text may be different than the
 
3340
        data you need to annotate that text. (At a minimum, you need both
 
3341
        parents to create an annotation, but only need 1 parent to generate the
 
3342
        fulltext.)
 
3343
 
 
3344
        :return: A list of (key, index_memo) records, suitable for
 
3345
            passing to read_records_iter to start reading in the raw data fro/
 
3346
            the pack file.
 
3347
        """
 
3348
        if key in self._annotated_lines:
 
3349
            # Nothing to do
 
3350
            return []
 
3351
        pending = set([key])
 
3352
        records = []
 
3353
        generation = 0
 
3354
        kept_generation = 0
 
3355
        while pending:
 
3356
            # get all pending nodes
 
3357
            generation += 1
 
3358
            this_iteration = pending
 
3359
            build_details = self._knit._index.get_build_details(this_iteration)
 
3360
            self._all_build_details.update(build_details)
 
3361
            # new_nodes = self._knit._index._get_entries(this_iteration)
 
3362
            pending = set()
 
3363
            for key, details in build_details.iteritems():
 
3364
                (index_memo, compression_parent, parents,
 
3365
                 record_details) = details
 
3366
                self._revision_id_graph[key] = parents
 
3367
                records.append((key, index_memo))
 
3368
                # Do we actually need to check _annotated_lines?
 
3369
                pending.update(p for p in parents
 
3370
                                 if p not in self._all_build_details)
 
3371
                if compression_parent:
 
3372
                    self._compression_children.setdefault(compression_parent,
 
3373
                        []).append(key)
 
3374
                if parents:
 
3375
                    for parent in parents:
 
3376
                        self._annotate_children.setdefault(parent,
 
3377
                            []).append(key)
 
3378
                    num_gens = generation - kept_generation
 
3379
                    if ((num_gens >= self._generations_until_keep)
 
3380
                        and len(parents) > 1):
 
3381
                        kept_generation = generation
 
3382
                        self._nodes_to_keep_annotations.add(key)
 
3383
 
 
3384
            missing_versions = this_iteration.difference(build_details.keys())
 
3385
            self._ghosts.update(missing_versions)
 
3386
            for missing_version in missing_versions:
 
3387
                # add a key, no parents
 
3388
                self._revision_id_graph[missing_version] = ()
 
3389
                pending.discard(missing_version) # don't look for it
 
3390
        if self._ghosts.intersection(self._compression_children):
 
3391
            raise KnitCorrupt(
 
3392
                "We cannot have nodes which have a ghost compression parent:\n"
 
3393
                "ghosts: %r\n"
 
3394
                "compression children: %r"
 
3395
                % (self._ghosts, self._compression_children))
 
3396
        # Cleanout anything that depends on a ghost so that we don't wait for
 
3397
        # the ghost to show up
 
3398
        for node in self._ghosts:
 
3399
            if node in self._annotate_children:
 
3400
                # We won't be building this node
 
3401
                del self._annotate_children[node]
 
3402
        # Generally we will want to read the records in reverse order, because
 
3403
        # we find the parent nodes after the children
 
3404
        records.reverse()
 
3405
        return records
 
3406
 
 
3407
    def _annotate_records(self, records):
 
3408
        """Build the annotations for the listed records."""
 
3409
        # We iterate in the order read, rather than a strict order requested
 
3410
        # However, process what we can, and put off to the side things that
 
3411
        # still need parents, cleaning them up when those parents are
 
3412
        # processed.
 
3413
        for (rev_id, record,
 
3414
             digest) in self._knit._read_records_iter(records):
 
3415
            if rev_id in self._annotated_lines:
 
3416
                continue
 
3417
            parent_ids = self._revision_id_graph[rev_id]
 
3418
            parent_ids = [p for p in parent_ids if p not in self._ghosts]
 
3419
            details = self._all_build_details[rev_id]
 
3420
            (index_memo, compression_parent, parents,
 
3421
             record_details) = details
 
3422
            nodes_to_annotate = []
 
3423
            # TODO: Remove the punning between compression parents, and
 
3424
            #       parent_ids, we should be able to do this without assuming
 
3425
            #       the build order
 
3426
            if len(parent_ids) == 0:
 
3427
                # There are no parents for this node, so just add it
 
3428
                # TODO: This probably needs to be decoupled
 
3429
                fulltext_content, delta = self._knit._factory.parse_record(
 
3430
                    rev_id, record, record_details, None)
 
3431
                fulltext = self._add_fulltext_content(rev_id, fulltext_content)
 
3432
                nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
 
3433
                    parent_ids, left_matching_blocks=None))
 
3434
            else:
 
3435
                child = (rev_id, parent_ids, record)
 
3436
                # Check if all the parents are present
 
3437
                self._check_parents(child, nodes_to_annotate)
 
3438
            while nodes_to_annotate:
 
3439
                # Should we use a queue here instead of a stack?
 
3440
                (rev_id, parent_ids, record) = nodes_to_annotate.pop()
 
3441
                (index_memo, compression_parent, parents,
 
3442
                 record_details) = self._all_build_details[rev_id]
 
3443
                blocks = None
 
3444
                if compression_parent is not None:
 
3445
                    comp_children = self._compression_children[compression_parent]
 
3446
                    if rev_id not in comp_children:
 
3447
                        raise AssertionError("%r not in compression children %r"
 
3448
                            % (rev_id, comp_children))
 
3449
                    # If there is only 1 child, it is safe to reuse this
 
3450
                    # content
 
3451
                    reuse_content = (len(comp_children) == 1
 
3452
                        and compression_parent not in
 
3453
                            self._nodes_to_keep_annotations)
 
3454
                    if reuse_content:
 
3455
                        # Remove it from the cache since it will be changing
 
3456
                        parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
 
3457
                        # Make sure to copy the fulltext since it might be
 
3458
                        # modified
 
3459
                        parent_fulltext = list(parent_fulltext_content.text())
 
3460
                    else:
 
3461
                        parent_fulltext_content = self._fulltext_contents[compression_parent]
 
3462
                        parent_fulltext = parent_fulltext_content.text()
 
3463
                    comp_children.remove(rev_id)
 
3464
                    fulltext_content, delta = self._knit._factory.parse_record(
 
3465
                        rev_id, record, record_details,
 
3466
                        parent_fulltext_content,
 
3467
                        copy_base_content=(not reuse_content))
 
3468
                    fulltext = self._add_fulltext_content(rev_id,
 
3469
                                                          fulltext_content)
 
3470
                    if compression_parent == parent_ids[0]:
 
3471
                        # the compression_parent is the left parent, so we can
 
3472
                        # re-use the delta
 
3473
                        blocks = KnitContent.get_line_delta_blocks(delta,
 
3474
                                parent_fulltext, fulltext)
 
3475
                else:
 
3476
                    fulltext_content = self._knit._factory.parse_fulltext(
 
3477
                        record, rev_id)
 
3478
                    fulltext = self._add_fulltext_content(rev_id,
 
3479
                        fulltext_content)
 
3480
                nodes_to_annotate.extend(
 
3481
                    self._add_annotation(rev_id, fulltext, parent_ids,
 
3482
                                     left_matching_blocks=blocks))
 
3483
 
 
3484
    def _get_heads_provider(self):
 
3485
        """Create a heads provider for resolving ancestry issues."""
 
3486
        if self._heads_provider is not None:
 
3487
            return self._heads_provider
 
3488
        parent_provider = _mod_graph.DictParentsProvider(
 
3489
            self._revision_id_graph)
 
3490
        graph_obj = _mod_graph.Graph(parent_provider)
 
3491
        head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
 
3492
        self._heads_provider = head_cache
 
3493
        return head_cache
 
3494
 
 
3495
    def annotate(self, key):
 
3496
        """Return the annotated fulltext at the given key.
 
3497
 
 
3498
        :param key: The key to annotate.
 
3499
        """
 
3500
        if len(self._knit._fallback_vfs) > 0:
 
3501
            # stacked knits can't use the fast path at present.
 
3502
            return self._simple_annotate(key)
 
3503
        while True:
 
3504
            try:
 
3505
                records = self._get_build_graph(key)
 
3506
                if key in self._ghosts:
 
3507
                    raise errors.RevisionNotPresent(key, self._knit)
 
3508
                self._annotate_records(records)
 
3509
                return self._annotated_lines[key]
 
3510
            except errors.RetryWithNewPacks, e:
 
3511
                self._knit._access.reload_or_raise(e)
 
3512
                # The cached build_details are no longer valid
 
3513
                self._all_build_details.clear()
 
3514
 
 
3515
    def _simple_annotate(self, key):
 
3516
        """Return annotated fulltext, rediffing from the full texts.
 
3517
 
 
3518
        This is slow but makes no assumptions about the repository
 
3519
        being able to produce line deltas.
 
3520
        """
 
3521
        # TODO: this code generates a parent maps of present ancestors; it
 
3522
        # could be split out into a separate method, and probably should use
 
3523
        # iter_ancestry instead. -- mbp and robertc 20080704
 
3524
        graph = _mod_graph.Graph(self._knit)
 
3525
        head_cache = _mod_graph.FrozenHeadsCache(graph)
 
3526
        search = graph._make_breadth_first_searcher([key])
 
3527
        keys = set()
 
3528
        while True:
 
3529
            try:
 
3530
                present, ghosts = search.next_with_ghosts()
 
3531
            except StopIteration:
 
3532
                break
 
3533
            keys.update(present)
 
3534
        parent_map = self._knit.get_parent_map(keys)
 
3535
        parent_cache = {}
 
3536
        reannotate = annotate.reannotate
 
3537
        for record in self._knit.get_record_stream(keys, 'topological', True):
 
3538
            key = record.key
 
3539
            fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
3540
            parents = parent_map[key]
 
3541
            if parents is not None:
 
3542
                parent_lines = [parent_cache[parent] for parent in parent_map[key]]
 
3543
            else:
 
3544
                parent_lines = []
 
3545
            parent_cache[key] = list(
 
3546
                reannotate(parent_lines, fulltext, key, None, head_cache))
 
3547
        try:
 
3548
            return parent_cache[key]
 
3549
        except KeyError, e:
 
3550
            raise errors.RevisionNotPresent(key, self._knit)
 
3551
 
 
3552
 
 
3553
try:
 
3554
    from bzrlib._knit_load_data_c import _load_data_c as _load_data
 
3555
except ImportError:
 
3556
    from bzrlib._knit_load_data_py import _load_data_py as _load_data