/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Andrew Bennetts
  • Date: 2010-01-12 03:53:21 UTC
  • mfrom: (4948 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4964.
  • Revision ID: andrew.bennetts@canonical.com-20100112035321-hofpz5p10224ryj3
Merge lp:bzr, resolving conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Knit versionedfile implementation.
18
18
 
20
20
updates.
21
21
 
22
22
Knit file layout:
23
 
lifeless: the data file is made up of "delta records".  each delta record has a delta header 
24
 
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of 
25
 
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a 
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
26
26
end-marker; simply "end VERSION"
27
27
 
28
28
delta can be line or full contents.a
35
35
130,130,2
36
36
8         if elt.get('executable') == 'yes':
37
37
8             ie.executable = True
38
 
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
39
39
 
40
40
 
41
41
whats in an index:
51
51
 
52
52
"""
53
53
 
54
 
# TODOS:
55
 
# 10:16 < lifeless> make partial index writes safe
56
 
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
 
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
58
 
#                    always' approach.
59
 
# move sha1 out of the content so that join is faster at verifying parents
60
 
# record content length ?
61
 
                  
62
54
 
63
55
from cStringIO import StringIO
64
 
from itertools import izip, chain
 
56
from itertools import izip
65
57
import operator
66
58
import os
67
59
import sys
77
69
    lru_cache,
78
70
    pack,
79
71
    progress,
 
72
    static_tuple,
80
73
    trace,
81
74
    tsort,
82
75
    tuned_gzip,
111
104
    ConstantMapper,
112
105
    ContentFactory,
113
106
    ChunkedContentFactory,
 
107
    sort_groupcompress,
114
108
    VersionedFile,
115
109
    VersionedFiles,
116
110
    )
131
125
 
132
126
DATA_SUFFIX = '.knit'
133
127
INDEX_SUFFIX = '.kndx'
 
128
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
134
129
 
135
130
 
136
131
class KnitAdapter(object):
138
133
 
139
134
    def __init__(self, basis_vf):
140
135
        """Create an adapter which accesses full texts from basis_vf.
141
 
        
 
136
 
142
137
        :param basis_vf: A versioned file to access basis texts of deltas from.
143
138
            May be None for adapters that do not need to access basis texts.
144
139
        """
151
146
class FTAnnotatedToUnannotated(KnitAdapter):
152
147
    """An adapter from FT annotated knits to unannotated ones."""
153
148
 
154
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
149
    def get_bytes(self, factory):
 
150
        annotated_compressed_bytes = factory._raw_record
155
151
        rec, contents = \
156
152
            self._data._parse_record_unchecked(annotated_compressed_bytes)
157
153
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
162
158
class DeltaAnnotatedToUnannotated(KnitAdapter):
163
159
    """An adapter for deltas from annotated to unannotated."""
164
160
 
165
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
161
    def get_bytes(self, factory):
 
162
        annotated_compressed_bytes = factory._raw_record
166
163
        rec, contents = \
167
164
            self._data._parse_record_unchecked(annotated_compressed_bytes)
168
165
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
175
172
class FTAnnotatedToFullText(KnitAdapter):
176
173
    """An adapter from FT annotated knits to unannotated ones."""
177
174
 
178
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
175
    def get_bytes(self, factory):
 
176
        annotated_compressed_bytes = factory._raw_record
179
177
        rec, contents = \
180
178
            self._data._parse_record_unchecked(annotated_compressed_bytes)
181
179
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
186
184
class DeltaAnnotatedToFullText(KnitAdapter):
187
185
    """An adapter for deltas from annotated to unannotated."""
188
186
 
189
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
187
    def get_bytes(self, factory):
 
188
        annotated_compressed_bytes = factory._raw_record
190
189
        rec, contents = \
191
190
            self._data._parse_record_unchecked(annotated_compressed_bytes)
192
191
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
209
208
class FTPlainToFullText(KnitAdapter):
210
209
    """An adapter from FT plain knits to unannotated ones."""
211
210
 
212
 
    def get_bytes(self, factory, compressed_bytes):
 
211
    def get_bytes(self, factory):
 
212
        compressed_bytes = factory._raw_record
213
213
        rec, contents = \
214
214
            self._data._parse_record_unchecked(compressed_bytes)
215
215
        content, delta = self._plain_factory.parse_record(factory.key[-1],
220
220
class DeltaPlainToFullText(KnitAdapter):
221
221
    """An adapter for deltas from annotated to unannotated."""
222
222
 
223
 
    def get_bytes(self, factory, compressed_bytes):
 
223
    def get_bytes(self, factory):
 
224
        compressed_bytes = factory._raw_record
224
225
        rec, contents = \
225
226
            self._data._parse_record_unchecked(compressed_bytes)
226
227
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
242
243
 
243
244
class KnitContentFactory(ContentFactory):
244
245
    """Content factory for streaming from knits.
245
 
    
 
246
 
246
247
    :seealso ContentFactory:
247
248
    """
248
249
 
249
250
    def __init__(self, key, parents, build_details, sha1, raw_record,
250
 
        annotated, knit=None):
 
251
        annotated, knit=None, network_bytes=None):
251
252
        """Create a KnitContentFactory for key.
252
 
        
 
253
 
253
254
        :param key: The key.
254
255
        :param parents: The parents.
255
256
        :param build_details: The build details as returned from
257
258
        :param sha1: The sha1 expected from the full text of this object.
258
259
        :param raw_record: The bytes of the knit data from disk.
259
260
        :param annotated: True if the raw data is annotated.
 
261
        :param network_bytes: None to calculate the network bytes on demand,
 
262
            not-none if they are already known.
260
263
        """
261
264
        ContentFactory.__init__(self)
262
265
        self.sha1 = sha1
272
275
            annotated_kind = ''
273
276
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
274
277
        self._raw_record = raw_record
 
278
        self._network_bytes = network_bytes
275
279
        self._build_details = build_details
276
280
        self._knit = knit
277
281
 
 
282
    def _create_network_bytes(self):
 
283
        """Create a fully serialised network version for transmission."""
 
284
        # storage_kind, key, parents, Noeol, raw_record
 
285
        key_bytes = '\x00'.join(self.key)
 
286
        if self.parents is None:
 
287
            parent_bytes = 'None:'
 
288
        else:
 
289
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
290
        if self._build_details[1]:
 
291
            noeol = 'N'
 
292
        else:
 
293
            noeol = ' '
 
294
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
295
            parent_bytes, noeol, self._raw_record)
 
296
        self._network_bytes = network_bytes
 
297
 
278
298
    def get_bytes_as(self, storage_kind):
279
299
        if storage_kind == self.storage_kind:
280
 
            return self._raw_record
 
300
            if self._network_bytes is None:
 
301
                self._create_network_bytes()
 
302
            return self._network_bytes
 
303
        if ('-ft-' in self.storage_kind and
 
304
            storage_kind in ('chunked', 'fulltext')):
 
305
            adapter_key = (self.storage_kind, 'fulltext')
 
306
            adapter_factory = adapter_registry.get(adapter_key)
 
307
            adapter = adapter_factory(None)
 
308
            bytes = adapter.get_bytes(self)
 
309
            if storage_kind == 'chunked':
 
310
                return [bytes]
 
311
            else:
 
312
                return bytes
281
313
        if self._knit is not None:
 
314
            # Not redundant with direct conversion above - that only handles
 
315
            # fulltext cases.
282
316
            if storage_kind == 'chunked':
283
317
                return self._knit.get_lines(self.key[0])
284
318
            elif storage_kind == 'fulltext':
287
321
            self.storage_kind)
288
322
 
289
323
 
 
324
class LazyKnitContentFactory(ContentFactory):
 
325
    """A ContentFactory which can either generate full text or a wire form.
 
326
 
 
327
    :seealso ContentFactory:
 
328
    """
 
329
 
 
330
    def __init__(self, key, parents, generator, first):
 
331
        """Create a LazyKnitContentFactory.
 
332
 
 
333
        :param key: The key of the record.
 
334
        :param parents: The parents of the record.
 
335
        :param generator: A _ContentMapGenerator containing the record for this
 
336
            key.
 
337
        :param first: Is this the first content object returned from generator?
 
338
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
339
            knit-delta-closure-ref
 
340
        """
 
341
        self.key = key
 
342
        self.parents = parents
 
343
        self.sha1 = None
 
344
        self._generator = generator
 
345
        self.storage_kind = "knit-delta-closure"
 
346
        if not first:
 
347
            self.storage_kind = self.storage_kind + "-ref"
 
348
        self._first = first
 
349
 
 
350
    def get_bytes_as(self, storage_kind):
 
351
        if storage_kind == self.storage_kind:
 
352
            if self._first:
 
353
                return self._generator._wire_bytes()
 
354
            else:
 
355
                # all the keys etc are contained in the bytes returned in the
 
356
                # first record.
 
357
                return ''
 
358
        if storage_kind in ('chunked', 'fulltext'):
 
359
            chunks = self._generator._get_one_work(self.key).text()
 
360
            if storage_kind == 'chunked':
 
361
                return chunks
 
362
            else:
 
363
                return ''.join(chunks)
 
364
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
365
            self.storage_kind)
 
366
 
 
367
 
 
368
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
369
    """Convert a network record to a iterator over stream records.
 
370
 
 
371
    :param storage_kind: The storage kind of the record.
 
372
        Must be 'knit-delta-closure'.
 
373
    :param bytes: The bytes of the record on the network.
 
374
    """
 
375
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
376
    return generator.get_record_stream()
 
377
 
 
378
 
 
379
def knit_network_to_record(storage_kind, bytes, line_end):
 
380
    """Convert a network record to a record object.
 
381
 
 
382
    :param storage_kind: The storage kind of the record.
 
383
    :param bytes: The bytes of the record on the network.
 
384
    """
 
385
    start = line_end
 
386
    line_end = bytes.find('\n', start)
 
387
    key = tuple(bytes[start:line_end].split('\x00'))
 
388
    start = line_end + 1
 
389
    line_end = bytes.find('\n', start)
 
390
    parent_line = bytes[start:line_end]
 
391
    if parent_line == 'None:':
 
392
        parents = None
 
393
    else:
 
394
        parents = tuple(
 
395
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
396
             if segment])
 
397
    start = line_end + 1
 
398
    noeol = bytes[start] == 'N'
 
399
    if 'ft' in storage_kind:
 
400
        method = 'fulltext'
 
401
    else:
 
402
        method = 'line-delta'
 
403
    build_details = (method, noeol)
 
404
    start = start + 1
 
405
    raw_record = bytes[start:]
 
406
    annotated = 'annotated' in storage_kind
 
407
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
408
        annotated, network_bytes=bytes)]
 
409
 
 
410
 
290
411
class KnitContent(object):
291
412
    """Content of a knit version to which deltas can be applied.
292
 
    
 
413
 
293
414
    This is always stored in memory as a list of lines with \n at the end,
294
 
    plus a flag saying if the final ending is really there or not, because that 
 
415
    plus a flag saying if the final ending is really there or not, because that
295
416
    corresponds to the on-disk knit representation.
296
417
    """
297
418
 
386
507
 
387
508
class PlainKnitContent(KnitContent):
388
509
    """Unannotated content.
389
 
    
 
510
 
390
511
    When annotate[_iter] is called on this content, the same version is reported
391
512
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
392
513
    objects.
544
665
 
545
666
        see parse_fulltext which this inverts.
546
667
        """
547
 
        # TODO: jam 20070209 We only do the caching thing to make sure that
548
 
        #       the origin is a valid utf-8 line, eventually we could remove it
549
668
        return ['%s %s' % (o, t) for o, t in content._lines]
550
669
 
551
670
    def lower_line_delta(self, delta):
566
685
        content = knit._get_content(key)
567
686
        # adjust for the fact that serialised annotations are only key suffixes
568
687
        # for this factory.
569
 
        if type(key) == tuple:
 
688
        if type(key) is tuple:
570
689
            prefix = key[:-1]
571
690
            origins = content.annotate()
572
691
            result = []
638
757
 
639
758
    def annotate(self, knit, key):
640
759
        annotator = _KnitAnnotator(knit)
641
 
        return annotator.annotate(key)
 
760
        return annotator.annotate_flat(key)
642
761
 
643
762
 
644
763
 
647
766
 
648
767
    This is only functional enough to run interface tests, it doesn't try to
649
768
    provide a full pack environment.
650
 
    
 
769
 
651
770
    :param annotated: knit annotations are wanted.
652
771
    :param mapper: The mapper from keys to paths.
653
772
    """
663
782
 
664
783
    This is only functional enough to run interface tests, it doesn't try to
665
784
    provide a full pack environment.
666
 
    
 
785
 
667
786
    :param graph: Store a graph.
668
787
    :param delta: Delta compress contents.
669
788
    :param keylength: How long should keys be.
700
819
    versioned_files.writer.end()
701
820
 
702
821
 
 
822
def _get_total_build_size(self, keys, positions):
 
823
    """Determine the total bytes to build these keys.
 
824
 
 
825
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
826
    don't inherit from a common base.)
 
827
 
 
828
    :param keys: Keys that we want to build
 
829
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
830
        as returned by _get_components_positions)
 
831
    :return: Number of bytes to build those keys
 
832
    """
 
833
    all_build_index_memos = {}
 
834
    build_keys = keys
 
835
    while build_keys:
 
836
        next_keys = set()
 
837
        for key in build_keys:
 
838
            # This is mostly for the 'stacked' case
 
839
            # Where we will be getting the data from a fallback
 
840
            if key not in positions:
 
841
                continue
 
842
            _, index_memo, compression_parent = positions[key]
 
843
            all_build_index_memos[key] = index_memo
 
844
            if compression_parent not in all_build_index_memos:
 
845
                next_keys.add(compression_parent)
 
846
        build_keys = next_keys
 
847
    return sum([index_memo[2] for index_memo
 
848
                in all_build_index_memos.itervalues()])
 
849
 
 
850
 
703
851
class KnitVersionedFiles(VersionedFiles):
704
852
    """Storage for many versioned files using knit compression.
705
853
 
706
854
    Backend storage is managed by indices and data objects.
707
855
 
708
 
    :ivar _index: A _KnitGraphIndex or similar that can describe the 
709
 
        parents, graph, compression and data location of entries in this 
710
 
        KnitVersionedFiles.  Note that this is only the index for 
 
856
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
857
        parents, graph, compression and data location of entries in this
 
858
        KnitVersionedFiles.  Note that this is only the index for
711
859
        *this* vfs; if there are fallbacks they must be queried separately.
712
860
    """
713
861
 
760
908
            # indexes can't directly store that, so we give them
761
909
            # an empty tuple instead.
762
910
            parents = ()
 
911
        line_bytes = ''.join(lines)
763
912
        return self._add(key, lines, parents,
764
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
 
913
            parent_texts, left_matching_blocks, nostore_sha, random_id,
 
914
            line_bytes=line_bytes)
 
915
 
 
916
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
917
        """See VersionedFiles._add_text()."""
 
918
        self._index._check_write_ok()
 
919
        self._check_add(key, None, random_id, check_content=False)
 
920
        if text.__class__ is not str:
 
921
            raise errors.BzrBadParameterUnicode("text")
 
922
        if parents is None:
 
923
            # The caller might pass None if there is no graph data, but kndx
 
924
            # indexes can't directly store that, so we give them
 
925
            # an empty tuple instead.
 
926
            parents = ()
 
927
        return self._add(key, None, parents,
 
928
            None, None, nostore_sha, random_id,
 
929
            line_bytes=text)
765
930
 
766
931
    def _add(self, key, lines, parents, parent_texts,
767
 
        left_matching_blocks, nostore_sha, random_id):
 
932
        left_matching_blocks, nostore_sha, random_id,
 
933
        line_bytes):
768
934
        """Add a set of lines on top of version specified by parents.
769
935
 
770
936
        Any versions not present will be converted into ghosts.
 
937
 
 
938
        :param lines: A list of strings where each one is a single line (has a
 
939
            single newline at the end of the string) This is now optional
 
940
            (callers can pass None). It is left in its location for backwards
 
941
            compatibility. It should ''.join(lines) must == line_bytes
 
942
        :param line_bytes: A single string containing the content
 
943
 
 
944
        We pass both lines and line_bytes because different routes bring the
 
945
        values to this function. And for memory efficiency, we don't want to
 
946
        have to split/join on-demand.
771
947
        """
772
948
        # first thing, if the content is something we don't need to store, find
773
949
        # that out.
774
 
        line_bytes = ''.join(lines)
775
950
        digest = sha_string(line_bytes)
776
951
        if nostore_sha == digest:
777
952
            raise errors.ExistingContent
798
973
 
799
974
        text_length = len(line_bytes)
800
975
        options = []
801
 
        if lines:
802
 
            if lines[-1][-1] != '\n':
803
 
                # copy the contents of lines.
 
976
        no_eol = False
 
977
        # Note: line_bytes is not modified to add a newline, that is tracked
 
978
        #       via the no_eol flag. 'lines' *is* modified, because that is the
 
979
        #       general values needed by the Content code.
 
980
        if line_bytes and line_bytes[-1] != '\n':
 
981
            options.append('no-eol')
 
982
            no_eol = True
 
983
            # Copy the existing list, or create a new one
 
984
            if lines is None:
 
985
                lines = osutils.split_lines(line_bytes)
 
986
            else:
804
987
                lines = lines[:]
805
 
                options.append('no-eol')
806
 
                lines[-1] = lines[-1] + '\n'
807
 
                line_bytes += '\n'
 
988
            # Replace the last line with one that ends in a final newline
 
989
            lines[-1] = lines[-1] + '\n'
 
990
        if lines is None:
 
991
            lines = osutils.split_lines(line_bytes)
808
992
 
809
 
        for element in key:
810
 
            if type(element) != str:
 
993
        for element in key[:-1]:
 
994
            if type(element) is not str:
 
995
                raise TypeError("key contains non-strings: %r" % (key,))
 
996
        if key[-1] is None:
 
997
            key = key[:-1] + ('sha1:' + digest,)
 
998
        elif type(key[-1]) is not str:
811
999
                raise TypeError("key contains non-strings: %r" % (key,))
812
1000
        # Knit hunks are still last-element only
813
1001
        version_id = key[-1]
814
1002
        content = self._factory.make(lines, version_id)
815
 
        if 'no-eol' in options:
 
1003
        if no_eol:
816
1004
            # Hint to the content object that its text() call should strip the
817
1005
            # EOL.
818
1006
            content._should_strip_eol = True
830
1018
        else:
831
1019
            options.append('fulltext')
832
1020
            # isinstance is slower and we have no hierarchy.
833
 
            if self._factory.__class__ == KnitPlainFactory:
 
1021
            if self._factory.__class__ is KnitPlainFactory:
834
1022
                # Use the already joined bytes saving iteration time in
835
1023
                # _record_to_data.
 
1024
                dense_lines = [line_bytes]
 
1025
                if no_eol:
 
1026
                    dense_lines.append('\n')
836
1027
                size, bytes = self._record_to_data(key, digest,
837
 
                    lines, [line_bytes])
 
1028
                    lines, dense_lines)
838
1029
            else:
839
1030
                # get mixed annotation + content and feed it into the
840
1031
                # serialiser.
852
1043
        """See VersionedFiles.annotate."""
853
1044
        return self._factory.annotate(self, key)
854
1045
 
855
 
    def check(self, progress_bar=None):
 
1046
    def get_annotator(self):
 
1047
        return _KnitAnnotator(self)
 
1048
 
 
1049
    def check(self, progress_bar=None, keys=None):
856
1050
        """See VersionedFiles.check()."""
 
1051
        if keys is None:
 
1052
            return self._logical_check()
 
1053
        else:
 
1054
            # At the moment, check does not extra work over get_record_stream
 
1055
            return self.get_record_stream(keys, 'unordered', True)
 
1056
 
 
1057
    def _logical_check(self):
857
1058
        # This doesn't actually test extraction of everything, but that will
858
1059
        # impact 'bzr check' substantially, and needs to be integrated with
859
1060
        # care. However, it does check for the obvious problem of a delta with
873
1074
    def _check_add(self, key, lines, random_id, check_content):
874
1075
        """check that version_id and lines are safe to add."""
875
1076
        version_id = key[-1]
876
 
        if contains_whitespace(version_id):
877
 
            raise InvalidRevisionId(version_id, self)
878
 
        self.check_not_reserved_id(version_id)
 
1077
        if version_id is not None:
 
1078
            if contains_whitespace(version_id):
 
1079
                raise InvalidRevisionId(version_id, self)
 
1080
            self.check_not_reserved_id(version_id)
879
1081
        # TODO: If random_id==False and the key is already present, we should
880
1082
        # probably check that the existing content is identical to what is
881
1083
        # being inserted, and otherwise raise an exception.  This would make
891
1093
 
892
1094
    def _check_header_version(self, rec, version_id):
893
1095
        """Checks the header version on original format knit records.
894
 
        
 
1096
 
895
1097
        These have the last component of the key embedded in the record.
896
1098
        """
897
1099
        if rec[1] != version_id:
976
1178
            if missing and not allow_missing:
977
1179
                raise errors.RevisionNotPresent(missing.pop(), self)
978
1180
        return component_data
979
 
       
 
1181
 
980
1182
    def _get_content(self, key, parent_texts={}):
981
1183
        """Returns a content object that makes up the specified
982
1184
        version."""
986
1188
            if not self.get_parent_map([key]):
987
1189
                raise RevisionNotPresent(key, self)
988
1190
            return cached_version
989
 
        text_map, contents_map = self._get_content_maps([key])
990
 
        return contents_map[key]
991
 
 
992
 
    def _get_content_maps(self, keys, nonlocal_keys=None):
993
 
        """Produce maps of text and KnitContents
994
 
        
995
 
        :param keys: The keys to produce content maps for.
996
 
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
997
 
            which are known to not be in this knit, but rather in one of the
998
 
            fallback knits.
999
 
        :return: (text_map, content_map) where text_map contains the texts for
1000
 
            the requested versions and content_map contains the KnitContents.
1001
 
        """
1002
 
        # FUTURE: This function could be improved for the 'extract many' case
1003
 
        # by tracking each component and only doing the copy when the number of
1004
 
        # children than need to apply delta's to it is > 1 or it is part of the
1005
 
        # final output.
1006
 
        keys = list(keys)
1007
 
        multiple_versions = len(keys) != 1
1008
 
        record_map = self._get_record_map(keys, allow_missing=True)
1009
 
 
1010
 
        text_map = {}
1011
 
        content_map = {}
1012
 
        final_content = {}
1013
 
        if nonlocal_keys is None:
1014
 
            nonlocal_keys = set()
1015
 
        else:
1016
 
            nonlocal_keys = frozenset(nonlocal_keys)
1017
 
        missing_keys = set(nonlocal_keys)
1018
 
        for source in self._fallback_vfs:
 
1191
        generator = _VFContentMapGenerator(self, [key])
 
1192
        return generator._get_content(key)
 
1193
 
 
1194
    def get_known_graph_ancestry(self, keys):
 
1195
        """Get a KnownGraph instance with the ancestry of keys."""
 
1196
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1197
        for fallback in self._fallback_vfs:
1019
1198
            if not missing_keys:
1020
1199
                break
1021
 
            for record in source.get_record_stream(missing_keys,
1022
 
                'unordered', True):
1023
 
                if record.storage_kind == 'absent':
1024
 
                    continue
1025
 
                missing_keys.remove(record.key)
1026
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
 
                text_map[record.key] = lines
1028
 
                content_map[record.key] = PlainKnitContent(lines, record.key)
1029
 
                if record.key in keys:
1030
 
                    final_content[record.key] = content_map[record.key]
1031
 
        for key in keys:
1032
 
            if key in nonlocal_keys:
1033
 
                # already handled
1034
 
                continue
1035
 
            components = []
1036
 
            cursor = key
1037
 
            while cursor is not None:
1038
 
                try:
1039
 
                    record, record_details, digest, next = record_map[cursor]
1040
 
                except KeyError:
1041
 
                    raise RevisionNotPresent(cursor, self)
1042
 
                components.append((cursor, record, record_details, digest))
1043
 
                cursor = next
1044
 
                if cursor in content_map:
1045
 
                    # no need to plan further back
1046
 
                    components.append((cursor, None, None, None))
1047
 
                    break
1048
 
 
1049
 
            content = None
1050
 
            for (component_id, record, record_details,
1051
 
                 digest) in reversed(components):
1052
 
                if component_id in content_map:
1053
 
                    content = content_map[component_id]
1054
 
                else:
1055
 
                    content, delta = self._factory.parse_record(key[-1],
1056
 
                        record, record_details, content,
1057
 
                        copy_base_content=multiple_versions)
1058
 
                    if multiple_versions:
1059
 
                        content_map[component_id] = content
1060
 
 
1061
 
            final_content[key] = content
1062
 
 
1063
 
            # digest here is the digest from the last applied component.
1064
 
            text = content.text()
1065
 
            actual_sha = sha_strings(text)
1066
 
            if actual_sha != digest:
1067
 
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1068
 
            text_map[key] = text
1069
 
        return text_map, final_content
 
1200
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1201
                                                missing_keys)
 
1202
            parent_map.update(f_parent_map)
 
1203
            missing_keys = f_missing_keys
 
1204
        kg = _mod_graph.KnownGraph(parent_map)
 
1205
        return kg
1070
1206
 
1071
1207
    def get_parent_map(self, keys):
1072
1208
        """Get a map of the graph parents of keys.
1102
1238
 
1103
1239
    def _get_record_map(self, keys, allow_missing=False):
1104
1240
        """Produce a dictionary of knit records.
1105
 
        
 
1241
 
1106
1242
        :return: {key:(record, record_details, digest, next)}
1107
1243
            record
1108
 
                data returned from read_records
 
1244
                data returned from read_records (a KnitContentobject)
1109
1245
            record_details
1110
1246
                opaque information to pass to parse_record
1111
1247
            digest
1114
1250
                build-parent of the version, i.e. the leftmost ancestor.
1115
1251
                Will be None if the record is not a delta.
1116
1252
        :param keys: The keys to build a map for
1117
 
        :param allow_missing: If some records are missing, rather than 
 
1253
        :param allow_missing: If some records are missing, rather than
1118
1254
            error, just return the data that could be generated.
1119
1255
        """
 
1256
        raw_map = self._get_record_map_unparsed(keys,
 
1257
            allow_missing=allow_missing)
 
1258
        return self._raw_map_to_record_map(raw_map)
 
1259
 
 
1260
    def _raw_map_to_record_map(self, raw_map):
 
1261
        """Parse the contents of _get_record_map_unparsed.
 
1262
 
 
1263
        :return: see _get_record_map.
 
1264
        """
 
1265
        result = {}
 
1266
        for key in raw_map:
 
1267
            data, record_details, next = raw_map[key]
 
1268
            content, digest = self._parse_record(key[-1], data)
 
1269
            result[key] = content, record_details, digest, next
 
1270
        return result
 
1271
 
 
1272
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1273
        """Get the raw data for reconstructing keys without parsing it.
 
1274
 
 
1275
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1276
            key-> raw_bytes, (method, noeol), compression_parent
 
1277
        """
1120
1278
        # This retries the whole request if anything fails. Potentially we
1121
1279
        # could be a bit more selective. We could track the keys whose records
1122
1280
        # we have successfully found, and then only request the new records
1132
1290
                # n = next
1133
1291
                records = [(key, i_m) for key, (r, i_m, n)
1134
1292
                                       in position_map.iteritems()]
1135
 
                record_map = {}
1136
 
                for key, record, digest in self._read_records_iter(records):
 
1293
                # Sort by the index memo, so that we request records from the
 
1294
                # same pack file together, and in forward-sorted order
 
1295
                records.sort(key=operator.itemgetter(1))
 
1296
                raw_record_map = {}
 
1297
                for key, data in self._read_records_iter_unchecked(records):
1137
1298
                    (record_details, index_memo, next) = position_map[key]
1138
 
                    record_map[key] = record, record_details, digest, next
1139
 
                return record_map
 
1299
                    raw_record_map[key] = data, record_details, next
 
1300
                return raw_record_map
1140
1301
            except errors.RetryWithNewPacks, e:
1141
1302
                self._access.reload_or_raise(e)
1142
1303
 
1143
 
    def _split_by_prefix(self, keys):
 
1304
    @classmethod
 
1305
    def _split_by_prefix(cls, keys):
1144
1306
        """For the given keys, split them up based on their prefix.
1145
1307
 
1146
1308
        To keep memory pressure somewhat under control, split the
1149
1311
        This should be revisited if _get_content_maps() can ever cross
1150
1312
        file-id boundaries.
1151
1313
 
 
1314
        The keys for a given file_id are kept in the same relative order.
 
1315
        Ordering between file_ids is not, though prefix_order will return the
 
1316
        order that the key was first seen.
 
1317
 
1152
1318
        :param keys: An iterable of key tuples
1153
 
        :return: A dict of {prefix: [key_list]}
 
1319
        :return: (split_map, prefix_order)
 
1320
            split_map       A dictionary mapping prefix => keys
 
1321
            prefix_order    The order that we saw the various prefixes
1154
1322
        """
1155
1323
        split_by_prefix = {}
 
1324
        prefix_order = []
1156
1325
        for key in keys:
1157
1326
            if len(key) == 1:
1158
 
                split_by_prefix.setdefault('', []).append(key)
1159
 
            else:
1160
 
                split_by_prefix.setdefault(key[0], []).append(key)
1161
 
        return split_by_prefix
 
1327
                prefix = ''
 
1328
            else:
 
1329
                prefix = key[0]
 
1330
 
 
1331
            if prefix in split_by_prefix:
 
1332
                split_by_prefix[prefix].append(key)
 
1333
            else:
 
1334
                split_by_prefix[prefix] = [key]
 
1335
                prefix_order.append(prefix)
 
1336
        return split_by_prefix, prefix_order
 
1337
 
 
1338
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1339
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1340
        """For the given keys, group them into 'best-sized' requests.
 
1341
 
 
1342
        The idea is to avoid making 1 request per file, but to never try to
 
1343
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1344
        possible, we should try to group requests to the same pack file
 
1345
        together.
 
1346
 
 
1347
        :return: list of (keys, non_local) tuples that indicate what keys
 
1348
            should be fetched next.
 
1349
        """
 
1350
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1351
        #       from the same pack file together, and we want to extract all
 
1352
        #       the texts for a given build-chain together. Ultimately it
 
1353
        #       probably needs a better global view.
 
1354
        total_keys = len(keys)
 
1355
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1356
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1357
        cur_keys = []
 
1358
        cur_non_local = set()
 
1359
        cur_size = 0
 
1360
        result = []
 
1361
        sizes = []
 
1362
        for prefix in prefix_order:
 
1363
            keys = prefix_split_keys[prefix]
 
1364
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1365
 
 
1366
            this_size = self._index._get_total_build_size(keys, positions)
 
1367
            cur_size += this_size
 
1368
            cur_keys.extend(keys)
 
1369
            cur_non_local.update(non_local)
 
1370
            if cur_size > _min_buffer_size:
 
1371
                result.append((cur_keys, cur_non_local))
 
1372
                sizes.append(cur_size)
 
1373
                cur_keys = []
 
1374
                cur_non_local = set()
 
1375
                cur_size = 0
 
1376
        if cur_keys:
 
1377
            result.append((cur_keys, cur_non_local))
 
1378
            sizes.append(cur_size)
 
1379
        return result
1162
1380
 
1163
1381
    def get_record_stream(self, keys, ordering, include_delta_closure):
1164
1382
        """Get a stream of records for keys.
1177
1395
        if not keys:
1178
1396
            return
1179
1397
        if not self._index.has_graph:
1180
 
            # Cannot topological order when no graph has been stored.
 
1398
            # Cannot sort when no graph has been stored.
1181
1399
            ordering = 'unordered'
1182
1400
 
1183
1401
        remaining_keys = keys
1206
1424
        absent_keys = keys.difference(set(positions))
1207
1425
        # There may be more absent keys : if we're missing the basis component
1208
1426
        # and are trying to include the delta closure.
 
1427
        # XXX: We should not ever need to examine remote sources because we do
 
1428
        # not permit deltas across versioned files boundaries.
1209
1429
        if include_delta_closure:
1210
1430
            needed_from_fallback = set()
1211
1431
            # Build up reconstructable_keys dict.  key:True in this dict means
1237
1457
                    needed_from_fallback.add(key)
1238
1458
        # Double index lookups here : need a unified api ?
1239
1459
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
1240
 
        if ordering == 'topological':
1241
 
            # Global topological sort
1242
 
            present_keys = tsort.topo_sort(global_map)
 
1460
        if ordering in ('topological', 'groupcompress'):
 
1461
            if ordering == 'topological':
 
1462
                # Global topological sort
 
1463
                present_keys = tsort.topo_sort(global_map)
 
1464
            else:
 
1465
                present_keys = sort_groupcompress(global_map)
1243
1466
            # Now group by source:
1244
1467
            source_keys = []
1245
1468
            current_source = None
1255
1478
        else:
1256
1479
            if ordering != 'unordered':
1257
1480
                raise AssertionError('valid values for ordering are:'
1258
 
                    ' "unordered" or "topological" not: %r'
 
1481
                    ' "unordered", "groupcompress" or "topological" not: %r'
1259
1482
                    % (ordering,))
1260
1483
            # Just group by source; remote sources first.
1261
1484
            present_keys = []
1283
1506
            # XXX: get_content_maps performs its own index queries; allow state
1284
1507
            # to be passed in.
1285
1508
            non_local_keys = needed_from_fallback - absent_keys
1286
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1287
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1288
 
            for prefix, keys in prefix_split_keys.iteritems():
1289
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1290
 
                non_local = set(non_local)
1291
 
                text_map, _ = self._get_content_maps(keys, non_local)
1292
 
                for key in keys:
1293
 
                    lines = text_map.pop(key)
1294
 
                    yield ChunkedContentFactory(key, global_map[key], None,
1295
 
                                                lines)
 
1509
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1510
                                                                non_local_keys,
 
1511
                                                                positions):
 
1512
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1513
                                                   global_map,
 
1514
                                                   ordering=ordering)
 
1515
                for record in generator.get_record_stream():
 
1516
                    yield record
1296
1517
        else:
1297
1518
            for source, keys in source_keys:
1298
1519
                if source is parent_maps[0]:
1299
1520
                    # this KnitVersionedFiles
1300
1521
                    records = [(key, positions[key][1]) for key in keys]
1301
 
                    for key, raw_data, sha1 in self._read_records_iter_raw(records):
 
1522
                    for key, raw_data in self._read_records_iter_unchecked(records):
1302
1523
                        (record_details, index_memo, _) = positions[key]
1303
1524
                        yield KnitContentFactory(key, global_map[key],
1304
 
                            record_details, sha1, raw_data, self._factory.annotated, None)
 
1525
                            record_details, None, raw_data, self._factory.annotated, None)
1305
1526
                else:
1306
1527
                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
1307
1528
                    for record in vf.get_record_stream(keys, ordering,
1330
1551
    def insert_record_stream(self, stream):
1331
1552
        """Insert a record stream into this container.
1332
1553
 
1333
 
        :param stream: A stream of records to insert. 
 
1554
        :param stream: A stream of records to insert.
1334
1555
        :return: None
1335
1556
        :seealso VersionedFiles.get_record_stream:
1336
1557
        """
1376
1597
        # key = basis_parent, value = index entry to add
1377
1598
        buffered_index_entries = {}
1378
1599
        for record in stream:
 
1600
            kind = record.storage_kind
 
1601
            if kind.startswith('knit-') and kind.endswith('-gz'):
 
1602
                # Check that the ID in the header of the raw knit bytes matches
 
1603
                # the record metadata.
 
1604
                raw_data = record._raw_record
 
1605
                df, rec = self._parse_record_header(record.key, raw_data)
 
1606
                df.close()
 
1607
            buffered = False
1379
1608
            parents = record.parents
1380
1609
            if record.storage_kind in delta_types:
1381
1610
                # TODO: eventually the record itself should track
1408
1637
                    except KeyError:
1409
1638
                        adapter_key = (record.storage_kind, "knit-ft-gz")
1410
1639
                        adapter = get_adapter(adapter_key)
1411
 
                    bytes = adapter.get_bytes(
1412
 
                        record, record.get_bytes_as(record.storage_kind))
 
1640
                    bytes = adapter.get_bytes(record)
1413
1641
                else:
1414
 
                    bytes = record.get_bytes_as(record.storage_kind)
 
1642
                    # It's a knit record, it has a _raw_record field (even if
 
1643
                    # it was reconstituted from a network stream).
 
1644
                    bytes = record._raw_record
1415
1645
                options = [record._build_details[0]]
1416
1646
                if record._build_details[1]:
1417
1647
                    options.append('no-eol')
1426
1656
                access_memo = self._access.add_raw_records(
1427
1657
                    [(record.key, len(bytes))], bytes)[0]
1428
1658
                index_entry = (record.key, options, access_memo, parents)
1429
 
                buffered = False
1430
1659
                if 'fulltext' not in options:
1431
1660
                    # Not a fulltext, so we need to make sure the compression
1432
1661
                    # parent will also be present.
1448
1677
            elif record.storage_kind == 'chunked':
1449
1678
                self.add_lines(record.key, parents,
1450
1679
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
 
            elif record.storage_kind == 'fulltext':
1452
 
                self.add_lines(record.key, parents,
1453
 
                    split_lines(record.get_bytes_as('fulltext')))
1454
1680
            else:
1455
 
                # Not a fulltext, and not suitable for direct insertion as a
 
1681
                # Not suitable for direct insertion as a
1456
1682
                # delta, either because it's not the right format, or this
1457
1683
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
1684
                # 0) or because it depends on a base only present in the
1459
1685
                # fallback kvfs.
1460
 
                adapter_key = record.storage_kind, 'fulltext'
1461
 
                adapter = get_adapter(adapter_key)
1462
 
                lines = split_lines(adapter.get_bytes(
1463
 
                    record, record.get_bytes_as(record.storage_kind)))
 
1686
                self._access.flush()
 
1687
                try:
 
1688
                    # Try getting a fulltext directly from the record.
 
1689
                    bytes = record.get_bytes_as('fulltext')
 
1690
                except errors.UnavailableRepresentation:
 
1691
                    adapter_key = record.storage_kind, 'fulltext'
 
1692
                    adapter = get_adapter(adapter_key)
 
1693
                    bytes = adapter.get_bytes(record)
 
1694
                lines = split_lines(bytes)
1464
1695
                try:
1465
1696
                    self.add_lines(record.key, parents, lines)
1466
1697
                except errors.RevisionAlreadyPresent:
1467
1698
                    pass
1468
1699
            # Add any records whose basis parent is now available.
1469
 
            added_keys = [record.key]
1470
 
            while added_keys:
1471
 
                key = added_keys.pop(0)
1472
 
                if key in buffered_index_entries:
1473
 
                    index_entries = buffered_index_entries[key]
1474
 
                    self._index.add_records(index_entries)
1475
 
                    added_keys.extend(
1476
 
                        [index_entry[0] for index_entry in index_entries])
1477
 
                    del buffered_index_entries[key]
1478
 
        # If there were any deltas which had a missing basis parent, error.
 
1700
            if not buffered:
 
1701
                added_keys = [record.key]
 
1702
                while added_keys:
 
1703
                    key = added_keys.pop(0)
 
1704
                    if key in buffered_index_entries:
 
1705
                        index_entries = buffered_index_entries[key]
 
1706
                        self._index.add_records(index_entries)
 
1707
                        added_keys.extend(
 
1708
                            [index_entry[0] for index_entry in index_entries])
 
1709
                        del buffered_index_entries[key]
1479
1710
        if buffered_index_entries:
1480
 
            from pprint import pformat
1481
 
            raise errors.BzrCheckError(
1482
 
                "record_stream refers to compression parents not in %r:\n%s"
1483
 
                % (self, pformat(sorted(buffered_index_entries.keys()))))
 
1711
            # There were index entries buffered at the end of the stream,
 
1712
            # So these need to be added (if the index supports holding such
 
1713
            # entries for later insertion)
 
1714
            all_entries = []
 
1715
            for key in buffered_index_entries:
 
1716
                index_entries = buffered_index_entries[key]
 
1717
                all_entries.extend(index_entries)
 
1718
            self._index.add_records(
 
1719
                all_entries, missing_compression_parents=True)
 
1720
 
 
1721
    def get_missing_compression_parent_keys(self):
 
1722
        """Return an iterable of keys of missing compression parents.
 
1723
 
 
1724
        Check this after calling insert_record_stream to find out if there are
 
1725
        any missing compression parents.  If there are, the records that
 
1726
        depend on them are not able to be inserted safely. For atomic
 
1727
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1728
        suspended - commit will fail at this point. Nonatomic knits will error
 
1729
        earlier because they have no staging area to put pending entries into.
 
1730
        """
 
1731
        return self._index.get_missing_compression_parents()
1484
1732
 
1485
1733
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1486
1734
        """Iterate over the lines in the versioned files from keys.
1503
1751
         * If a requested key did not change any lines (or didn't have any
1504
1752
           lines), it may not be mentioned at all in the result.
1505
1753
 
 
1754
        :param pb: Progress bar supplied by caller.
1506
1755
        :return: An iterator over (line, key).
1507
1756
        """
1508
1757
        if pb is None:
1522
1771
                        key_records.append((key, details[0]))
1523
1772
                records_iter = enumerate(self._read_records_iter(key_records))
1524
1773
                for (key_idx, (key, data, sha_value)) in records_iter:
1525
 
                    pb.update('Walking content.', key_idx, total)
 
1774
                    pb.update('Walking content', key_idx, total)
1526
1775
                    compression_parent = build_details[key][1]
1527
1776
                    if compression_parent is None:
1528
1777
                        # fulltext
1529
1778
                        line_iterator = self._factory.get_fulltext_content(data)
1530
1779
                    else:
1531
 
                        # Delta 
 
1780
                        # Delta
1532
1781
                        line_iterator = self._factory.get_linedelta_content(data)
1533
1782
                    # Now that we are yielding the data for this key, remove it
1534
1783
                    # from the list
1545
1794
        # If there are still keys we've not yet found, we look in the fallback
1546
1795
        # vfs, and hope to find them there.  Note that if the keys are found
1547
1796
        # but had no changes or no content, the fallback may not return
1548
 
        # anything.  
 
1797
        # anything.
1549
1798
        if keys and not self._fallback_vfs:
1550
1799
            # XXX: strictly the second parameter is meant to be the file id
1551
1800
            # but it's not easily accessible here.
1558
1807
                source_keys.add(key)
1559
1808
                yield line, key
1560
1809
            keys.difference_update(source_keys)
1561
 
        pb.update('Walking content.', total, total)
 
1810
        pb.update('Walking content', total, total)
1562
1811
 
1563
1812
    def _make_line_delta(self, delta_seq, new_content):
1564
1813
        """Generate a line delta from delta_seq and new_content."""
1573
1822
                           delta=None, annotated=None,
1574
1823
                           left_matching_blocks=None):
1575
1824
        """Merge annotations for content and generate deltas.
1576
 
        
 
1825
 
1577
1826
        This is done by comparing the annotations based on changes to the text
1578
1827
        and generating a delta on the resulting full texts. If annotations are
1579
1828
        not being created then a simple delta is created.
1661
1910
                                 rec[1], record_contents))
1662
1911
        if last_line != 'end %s\n' % rec[1]:
1663
1912
            raise KnitCorrupt(self,
1664
 
                              'unexpected version end line %r, wanted %r' 
 
1913
                              'unexpected version end line %r, wanted %r'
1665
1914
                              % (last_line, rec[1]))
1666
1915
        df.close()
1667
1916
        return rec, record_contents
1684
1933
        if not needed_records:
1685
1934
            return
1686
1935
 
1687
 
        # The transport optimizes the fetching as well 
 
1936
        # The transport optimizes the fetching as well
1688
1937
        # (ie, reads continuous ranges.)
1689
1938
        raw_data = self._access.get_raw_records(
1690
1939
            [index_memo for key, index_memo in needed_records])
1700
1949
        This unpacks enough of the text record to validate the id is
1701
1950
        as expected but thats all.
1702
1951
 
1703
 
        Each item the iterator yields is (key, bytes, sha1_of_full_text).
 
1952
        Each item the iterator yields is (key, bytes,
 
1953
            expected_sha1_of_full_text).
 
1954
        """
 
1955
        for key, data in self._read_records_iter_unchecked(records):
 
1956
            # validate the header (note that we can only use the suffix in
 
1957
            # current knit records).
 
1958
            df, rec = self._parse_record_header(key, data)
 
1959
            df.close()
 
1960
            yield key, data, rec[3]
 
1961
 
 
1962
    def _read_records_iter_unchecked(self, records):
 
1963
        """Read text records from data file and yield raw data.
 
1964
 
 
1965
        No validation is done.
 
1966
 
 
1967
        Yields tuples of (key, data).
1704
1968
        """
1705
1969
        # setup an iterator of the external records:
1706
1970
        # uses readv so nice and fast we hope.
1712
1976
 
1713
1977
        for key, index_memo in records:
1714
1978
            data = raw_records.next()
1715
 
            # validate the header (note that we can only use the suffix in
1716
 
            # current knit records).
1717
 
            df, rec = self._parse_record_header(key, data)
1718
 
            df.close()
1719
 
            yield key, data, rec[3]
 
1979
            yield key, data
1720
1980
 
1721
1981
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1722
1982
        """Convert key, digest, lines into a raw data block.
1723
 
        
 
1983
 
1724
1984
        :param key: The key of the record. Currently keys are always serialised
1725
1985
            using just the trailing component.
1726
1986
        :param dense_lines: The bytes of lines but in a denser form. For
1731
1991
            function spends less time resizing the final string.
1732
1992
        :return: (len, a StringIO instance with the raw data ready to read.)
1733
1993
        """
1734
 
        # Note: using a string copy here increases memory pressure with e.g.
1735
 
        # ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
1736
 
        # when doing the initial commit of a mozilla tree. RBC 20070921
1737
 
        bytes = ''.join(chain(
1738
 
            ["version %s %d %s\n" % (key[-1],
1739
 
                                     len(lines),
1740
 
                                     digest)],
1741
 
            dense_lines or lines,
1742
 
            ["end %s\n" % key[-1]]))
1743
 
        if type(bytes) != str:
1744
 
            raise AssertionError(
1745
 
                'data must be plain bytes was %s' % type(bytes))
 
1994
        chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
 
1995
        chunks.extend(dense_lines or lines)
 
1996
        chunks.append("end %s\n" % key[-1])
 
1997
        for chunk in chunks:
 
1998
            if type(chunk) is not str:
 
1999
                raise AssertionError(
 
2000
                    'data must be plain bytes was %s' % type(chunk))
1746
2001
        if lines and lines[-1][-1] != '\n':
1747
2002
            raise ValueError('corrupt lines value %r' % lines)
1748
 
        compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
 
2003
        compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
1749
2004
        return len(compressed_bytes), compressed_bytes
1750
2005
 
1751
2006
    def _split_header(self, line):
1766
2021
        return result
1767
2022
 
1768
2023
 
 
2024
class _ContentMapGenerator(object):
 
2025
    """Generate texts or expose raw deltas for a set of texts."""
 
2026
 
 
2027
    def __init__(self, ordering='unordered'):
 
2028
        self._ordering = ordering
 
2029
 
 
2030
    def _get_content(self, key):
 
2031
        """Get the content object for key."""
 
2032
        # Note that _get_content is only called when the _ContentMapGenerator
 
2033
        # has been constructed with just one key requested for reconstruction.
 
2034
        if key in self.nonlocal_keys:
 
2035
            record = self.get_record_stream().next()
 
2036
            # Create a content object on the fly
 
2037
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
2038
            return PlainKnitContent(lines, record.key)
 
2039
        else:
 
2040
            # local keys we can ask for directly
 
2041
            return self._get_one_work(key)
 
2042
 
 
2043
    def get_record_stream(self):
 
2044
        """Get a record stream for the keys requested during __init__."""
 
2045
        for record in self._work():
 
2046
            yield record
 
2047
 
 
2048
    def _work(self):
 
2049
        """Produce maps of text and KnitContents as dicts.
 
2050
 
 
2051
        :return: (text_map, content_map) where text_map contains the texts for
 
2052
            the requested versions and content_map contains the KnitContents.
 
2053
        """
 
2054
        # NB: By definition we never need to read remote sources unless texts
 
2055
        # are requested from them: we don't delta across stores - and we
 
2056
        # explicitly do not want to to prevent data loss situations.
 
2057
        if self.global_map is None:
 
2058
            self.global_map = self.vf.get_parent_map(self.keys)
 
2059
        nonlocal_keys = self.nonlocal_keys
 
2060
 
 
2061
        missing_keys = set(nonlocal_keys)
 
2062
        # Read from remote versioned file instances and provide to our caller.
 
2063
        for source in self.vf._fallback_vfs:
 
2064
            if not missing_keys:
 
2065
                break
 
2066
            # Loop over fallback repositories asking them for texts - ignore
 
2067
            # any missing from a particular fallback.
 
2068
            for record in source.get_record_stream(missing_keys,
 
2069
                self._ordering, True):
 
2070
                if record.storage_kind == 'absent':
 
2071
                    # Not in thie particular stream, may be in one of the
 
2072
                    # other fallback vfs objects.
 
2073
                    continue
 
2074
                missing_keys.remove(record.key)
 
2075
                yield record
 
2076
 
 
2077
        if self._raw_record_map is None:
 
2078
            raise AssertionError('_raw_record_map should have been filled')
 
2079
        first = True
 
2080
        for key in self.keys:
 
2081
            if key in self.nonlocal_keys:
 
2082
                continue
 
2083
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2084
            first = False
 
2085
 
 
2086
    def _get_one_work(self, requested_key):
 
2087
        # Now, if we have calculated everything already, just return the
 
2088
        # desired text.
 
2089
        if requested_key in self._contents_map:
 
2090
            return self._contents_map[requested_key]
 
2091
        # To simplify things, parse everything at once - code that wants one text
 
2092
        # probably wants them all.
 
2093
        # FUTURE: This function could be improved for the 'extract many' case
 
2094
        # by tracking each component and only doing the copy when the number of
 
2095
        # children than need to apply delta's to it is > 1 or it is part of the
 
2096
        # final output.
 
2097
        multiple_versions = len(self.keys) != 1
 
2098
        if self._record_map is None:
 
2099
            self._record_map = self.vf._raw_map_to_record_map(
 
2100
                self._raw_record_map)
 
2101
        record_map = self._record_map
 
2102
        # raw_record_map is key:
 
2103
        # Have read and parsed records at this point.
 
2104
        for key in self.keys:
 
2105
            if key in self.nonlocal_keys:
 
2106
                # already handled
 
2107
                continue
 
2108
            components = []
 
2109
            cursor = key
 
2110
            while cursor is not None:
 
2111
                try:
 
2112
                    record, record_details, digest, next = record_map[cursor]
 
2113
                except KeyError:
 
2114
                    raise RevisionNotPresent(cursor, self)
 
2115
                components.append((cursor, record, record_details, digest))
 
2116
                cursor = next
 
2117
                if cursor in self._contents_map:
 
2118
                    # no need to plan further back
 
2119
                    components.append((cursor, None, None, None))
 
2120
                    break
 
2121
 
 
2122
            content = None
 
2123
            for (component_id, record, record_details,
 
2124
                 digest) in reversed(components):
 
2125
                if component_id in self._contents_map:
 
2126
                    content = self._contents_map[component_id]
 
2127
                else:
 
2128
                    content, delta = self._factory.parse_record(key[-1],
 
2129
                        record, record_details, content,
 
2130
                        copy_base_content=multiple_versions)
 
2131
                    if multiple_versions:
 
2132
                        self._contents_map[component_id] = content
 
2133
 
 
2134
            # digest here is the digest from the last applied component.
 
2135
            text = content.text()
 
2136
            actual_sha = sha_strings(text)
 
2137
            if actual_sha != digest:
 
2138
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2139
        if multiple_versions:
 
2140
            return self._contents_map[requested_key]
 
2141
        else:
 
2142
            return content
 
2143
 
 
2144
    def _wire_bytes(self):
 
2145
        """Get the bytes to put on the wire for 'key'.
 
2146
 
 
2147
        The first collection of bytes asked for returns the serialised
 
2148
        raw_record_map and the additional details (key, parent) for key.
 
2149
        Subsequent calls return just the additional details (key, parent).
 
2150
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2151
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2152
 
 
2153
        :param key: A key from the content generator.
 
2154
        :return: Bytes to put on the wire.
 
2155
        """
 
2156
        lines = []
 
2157
        # kind marker for dispatch on the far side,
 
2158
        lines.append('knit-delta-closure')
 
2159
        # Annotated or not
 
2160
        if self.vf._factory.annotated:
 
2161
            lines.append('annotated')
 
2162
        else:
 
2163
            lines.append('')
 
2164
        # then the list of keys
 
2165
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
2166
            if key not in self.nonlocal_keys]))
 
2167
        # then the _raw_record_map in serialised form:
 
2168
        map_byte_list = []
 
2169
        # for each item in the map:
 
2170
        # 1 line with key
 
2171
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2172
        # one line with method
 
2173
        # one line with noeol
 
2174
        # one line with next ('' for None)
 
2175
        # one line with byte count of the record bytes
 
2176
        # the record bytes
 
2177
        for key, (record_bytes, (method, noeol), next) in \
 
2178
            self._raw_record_map.iteritems():
 
2179
            key_bytes = '\x00'.join(key)
 
2180
            parents = self.global_map.get(key, None)
 
2181
            if parents is None:
 
2182
                parent_bytes = 'None:'
 
2183
            else:
 
2184
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2185
            method_bytes = method
 
2186
            if noeol:
 
2187
                noeol_bytes = "T"
 
2188
            else:
 
2189
                noeol_bytes = "F"
 
2190
            if next:
 
2191
                next_bytes = '\x00'.join(next)
 
2192
            else:
 
2193
                next_bytes = ''
 
2194
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2195
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2196
                len(record_bytes), record_bytes))
 
2197
        map_bytes = ''.join(map_byte_list)
 
2198
        lines.append(map_bytes)
 
2199
        bytes = '\n'.join(lines)
 
2200
        return bytes
 
2201
 
 
2202
 
 
2203
class _VFContentMapGenerator(_ContentMapGenerator):
 
2204
    """Content map generator reading from a VersionedFiles object."""
 
2205
 
 
2206
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2207
        global_map=None, raw_record_map=None, ordering='unordered'):
 
2208
        """Create a _ContentMapGenerator.
 
2209
 
 
2210
        :param versioned_files: The versioned files that the texts are being
 
2211
            extracted from.
 
2212
        :param keys: The keys to produce content maps for.
 
2213
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2214
            which are known to not be in this knit, but rather in one of the
 
2215
            fallback knits.
 
2216
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2217
            This is required if get_record_stream() is to be used.
 
2218
        :param raw_record_map: A unparsed raw record map to use for answering
 
2219
            contents.
 
2220
        """
 
2221
        _ContentMapGenerator.__init__(self, ordering=ordering)
 
2222
        # The vf to source data from
 
2223
        self.vf = versioned_files
 
2224
        # The keys desired
 
2225
        self.keys = list(keys)
 
2226
        # Keys known to be in fallback vfs objects
 
2227
        if nonlocal_keys is None:
 
2228
            self.nonlocal_keys = set()
 
2229
        else:
 
2230
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2231
        # Parents data for keys to be returned in get_record_stream
 
2232
        self.global_map = global_map
 
2233
        # The chunked lists for self.keys in text form
 
2234
        self._text_map = {}
 
2235
        # A cache of KnitContent objects used in extracting texts.
 
2236
        self._contents_map = {}
 
2237
        # All the knit records needed to assemble the requested keys as full
 
2238
        # texts.
 
2239
        self._record_map = None
 
2240
        if raw_record_map is None:
 
2241
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2242
                allow_missing=True)
 
2243
        else:
 
2244
            self._raw_record_map = raw_record_map
 
2245
        # the factory for parsing records
 
2246
        self._factory = self.vf._factory
 
2247
 
 
2248
 
 
2249
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2250
    """Content map generator sourced from a network stream."""
 
2251
 
 
2252
    def __init__(self, bytes, line_end):
 
2253
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2254
        self._bytes = bytes
 
2255
        self.global_map = {}
 
2256
        self._raw_record_map = {}
 
2257
        self._contents_map = {}
 
2258
        self._record_map = None
 
2259
        self.nonlocal_keys = []
 
2260
        # Get access to record parsing facilities
 
2261
        self.vf = KnitVersionedFiles(None, None)
 
2262
        start = line_end
 
2263
        # Annotated or not
 
2264
        line_end = bytes.find('\n', start)
 
2265
        line = bytes[start:line_end]
 
2266
        start = line_end + 1
 
2267
        if line == 'annotated':
 
2268
            self._factory = KnitAnnotateFactory()
 
2269
        else:
 
2270
            self._factory = KnitPlainFactory()
 
2271
        # list of keys to emit in get_record_stream
 
2272
        line_end = bytes.find('\n', start)
 
2273
        line = bytes[start:line_end]
 
2274
        start = line_end + 1
 
2275
        self.keys = [
 
2276
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2277
            if segment]
 
2278
        # now a loop until the end. XXX: It would be nice if this was just a
 
2279
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2280
        # there is a decent sized gap stopping that at the moment.
 
2281
        end = len(bytes)
 
2282
        while start < end:
 
2283
            # 1 line with key
 
2284
            line_end = bytes.find('\n', start)
 
2285
            key = tuple(bytes[start:line_end].split('\x00'))
 
2286
            start = line_end + 1
 
2287
            # 1 line with parents (None: for None, '' for ())
 
2288
            line_end = bytes.find('\n', start)
 
2289
            line = bytes[start:line_end]
 
2290
            if line == 'None:':
 
2291
                parents = None
 
2292
            else:
 
2293
                parents = tuple(
 
2294
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2295
                     if segment])
 
2296
            self.global_map[key] = parents
 
2297
            start = line_end + 1
 
2298
            # one line with method
 
2299
            line_end = bytes.find('\n', start)
 
2300
            line = bytes[start:line_end]
 
2301
            method = line
 
2302
            start = line_end + 1
 
2303
            # one line with noeol
 
2304
            line_end = bytes.find('\n', start)
 
2305
            line = bytes[start:line_end]
 
2306
            noeol = line == "T"
 
2307
            start = line_end + 1
 
2308
            # one line with next ('' for None)
 
2309
            line_end = bytes.find('\n', start)
 
2310
            line = bytes[start:line_end]
 
2311
            if not line:
 
2312
                next = None
 
2313
            else:
 
2314
                next = tuple(bytes[start:line_end].split('\x00'))
 
2315
            start = line_end + 1
 
2316
            # one line with byte count of the record bytes
 
2317
            line_end = bytes.find('\n', start)
 
2318
            line = bytes[start:line_end]
 
2319
            count = int(line)
 
2320
            start = line_end + 1
 
2321
            # the record bytes
 
2322
            record_bytes = bytes[start:start+count]
 
2323
            start = start + count
 
2324
            # put it in the map
 
2325
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2326
 
 
2327
    def get_record_stream(self):
 
2328
        """Get a record stream for for keys requested by the bytestream."""
 
2329
        first = True
 
2330
        for key in self.keys:
 
2331
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2332
            first = False
 
2333
 
 
2334
    def _wire_bytes(self):
 
2335
        return self._bytes
 
2336
 
 
2337
 
1769
2338
class _KndxIndex(object):
1770
2339
    """Manages knit index files
1771
2340
 
1785
2354
 
1786
2355
    Duplicate entries may be written to the index for a single version id
1787
2356
    if this is done then the latter one completely replaces the former:
1788
 
    this allows updates to correct version and parent information. 
 
2357
    this allows updates to correct version and parent information.
1789
2358
    Note that the two entries may share the delta, and that successive
1790
2359
    annotations and references MUST point to the first entry.
1791
2360
 
1792
2361
    The index file on disc contains a header, followed by one line per knit
1793
2362
    record. The same revision can be present in an index file more than once.
1794
 
    The first occurrence gets assigned a sequence number starting from 0. 
1795
 
    
 
2363
    The first occurrence gets assigned a sequence number starting from 0.
 
2364
 
1796
2365
    The format of a single line is
1797
2366
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1798
2367
    REVISION_ID is a utf8-encoded revision id
1799
 
    FLAGS is a comma separated list of flags about the record. Values include 
 
2368
    FLAGS is a comma separated list of flags about the record. Values include
1800
2369
        no-eol, line-delta, fulltext.
1801
2370
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
1802
 
        that the the compressed data starts at.
 
2371
        that the compressed data starts at.
1803
2372
    LENGTH is the ascii representation of the length of the data file.
1804
2373
    PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1805
2374
        REVISION_ID.
1806
2375
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1807
2376
        revision id already in the knit that is a parent of REVISION_ID.
1808
2377
    The ' :' marker is the end of record marker.
1809
 
    
 
2378
 
1810
2379
    partial writes:
1811
2380
    when a write is interrupted to the index file, it will result in a line
1812
2381
    that does not end in ' :'. If the ' :' is not present at the end of a line,
1837
2406
        self._reset_cache()
1838
2407
        self.has_graph = True
1839
2408
 
1840
 
    def add_records(self, records, random_id=False):
 
2409
    def add_records(self, records, random_id=False, missing_compression_parents=False):
1841
2410
        """Add multiple records to the index.
1842
 
        
 
2411
 
1843
2412
        :param records: a list of tuples:
1844
2413
                         (key, options, access_memo, parents).
1845
2414
        :param random_id: If True the ids being added were randomly generated
1846
2415
            and no check for existence will be performed.
 
2416
        :param missing_compression_parents: If True the records being added are
 
2417
            only compressed against texts already in the index (or inside
 
2418
            records). If False the records all refer to unavailable texts (or
 
2419
            texts inside records) as compression parents.
1847
2420
        """
 
2421
        if missing_compression_parents:
 
2422
            # It might be nice to get the edge of the records. But keys isn't
 
2423
            # _wrong_.
 
2424
            keys = sorted(record[0] for record in records)
 
2425
            raise errors.RevisionNotPresent(keys, self)
1848
2426
        paths = {}
1849
2427
        for record in records:
1850
2428
            key = record[0]
1867
2445
                    line = "\n%s %s %s %s %s :" % (
1868
2446
                        key[-1], ','.join(options), pos, size,
1869
2447
                        self._dictionary_compress(parents))
1870
 
                    if type(line) != str:
 
2448
                    if type(line) is not str:
1871
2449
                        raise AssertionError(
1872
2450
                            'data must be utf8 was %s' % type(line))
1873
2451
                    lines.append(line)
1881
2459
                self._kndx_cache[prefix] = (orig_cache, orig_history)
1882
2460
                raise
1883
2461
 
 
2462
    def scan_unvalidated_index(self, graph_index):
 
2463
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2464
        # Because kndx files do not support atomic insertion via separate index
 
2465
        # files, they do not support this method.
 
2466
        raise NotImplementedError(self.scan_unvalidated_index)
 
2467
 
 
2468
    def get_missing_compression_parents(self):
 
2469
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2470
        # Because kndx files do not support atomic insertion via separate index
 
2471
        # files, they do not support this method.
 
2472
        raise NotImplementedError(self.get_missing_compression_parents)
 
2473
 
1884
2474
    def _cache_key(self, key, options, pos, size, parent_keys):
1885
2475
        """Cache a version record in the history array and index cache.
1886
2476
 
1993
2583
        except KeyError:
1994
2584
            raise RevisionNotPresent(key, self)
1995
2585
 
 
2586
    def find_ancestry(self, keys):
 
2587
        """See CombinedGraphIndex.find_ancestry()"""
 
2588
        prefixes = set(key[:-1] for key in keys)
 
2589
        self._load_prefixes(prefixes)
 
2590
        result = {}
 
2591
        parent_map = {}
 
2592
        missing_keys = set()
 
2593
        pending_keys = list(keys)
 
2594
        # This assumes that keys will not reference parents in a different
 
2595
        # prefix, which is accurate so far.
 
2596
        while pending_keys:
 
2597
            key = pending_keys.pop()
 
2598
            if key in parent_map:
 
2599
                continue
 
2600
            prefix = key[:-1]
 
2601
            try:
 
2602
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2603
            except KeyError:
 
2604
                missing_keys.add(key)
 
2605
            else:
 
2606
                parent_keys = tuple([prefix + (suffix,)
 
2607
                                     for suffix in suffix_parents])
 
2608
                parent_map[key] = parent_keys
 
2609
                pending_keys.extend([p for p in parent_keys
 
2610
                                        if p not in parent_map])
 
2611
        return parent_map, missing_keys
 
2612
 
1996
2613
    def get_parent_map(self, keys):
1997
2614
        """Get a map of the parents of keys.
1998
2615
 
2019
2636
 
2020
2637
    def get_position(self, key):
2021
2638
        """Return details needed to access the version.
2022
 
        
 
2639
 
2023
2640
        :return: a tuple (key, data position, size) to hand to the access
2024
2641
            logic to get the record.
2025
2642
        """
2029
2646
        return key, entry[2], entry[3]
2030
2647
 
2031
2648
    has_key = _mod_index._has_key_from_parent_map
2032
 
    
 
2649
 
2033
2650
    def _init_index(self, path, extra_lines=[]):
2034
2651
        """Initialize an index."""
2035
2652
        sio = StringIO()
2044
2661
 
2045
2662
    def keys(self):
2046
2663
        """Get all the keys in the collection.
2047
 
        
 
2664
 
2048
2665
        The keys are not ordered.
2049
2666
        """
2050
2667
        result = set()
2051
2668
        # Identify all key prefixes.
2052
2669
        # XXX: A bit hacky, needs polish.
2053
 
        if type(self._mapper) == ConstantMapper:
 
2670
        if type(self._mapper) is ConstantMapper:
2054
2671
            prefixes = [()]
2055
2672
        else:
2056
2673
            relpaths = set()
2063
2680
            for suffix in self._kndx_cache[prefix][1]:
2064
2681
                result.add(prefix + (suffix,))
2065
2682
        return result
2066
 
    
 
2683
 
2067
2684
    def _load_prefixes(self, prefixes):
2068
2685
        """Load the indices for prefixes."""
2069
2686
        self._check_read()
2088
2705
                    del self._history
2089
2706
                except NoSuchFile:
2090
2707
                    self._kndx_cache[prefix] = ({}, [])
2091
 
                    if type(self._mapper) == ConstantMapper:
 
2708
                    if type(self._mapper) is ConstantMapper:
2092
2709
                        # preserve behaviour for revisions.kndx etc.
2093
2710
                        self._init_index(path)
2094
2711
                    del self._cache
2107
2724
 
2108
2725
    def _dictionary_compress(self, keys):
2109
2726
        """Dictionary compress keys.
2110
 
        
 
2727
 
2111
2728
        :param keys: The keys to generate references to.
2112
2729
        :return: A string representation of keys. keys which are present are
2113
2730
            dictionary compressed, and others are emitted as fulltext with a
2161
2778
            return index_memo[0][:-1], index_memo[1]
2162
2779
        return keys.sort(key=get_sort_key)
2163
2780
 
 
2781
    _get_total_build_size = _get_total_build_size
 
2782
 
2164
2783
    def _split_key(self, key):
2165
2784
        """Split key into a prefix and suffix."""
2166
2785
        return key[:-1], key[-1]
2167
2786
 
2168
2787
 
 
2788
class _KeyRefs(object):
 
2789
 
 
2790
    def __init__(self, track_new_keys=False):
 
2791
        # dict mapping 'key' to 'set of keys referring to that key'
 
2792
        self.refs = {}
 
2793
        if track_new_keys:
 
2794
            # set remembering all new keys
 
2795
            self.new_keys = set()
 
2796
        else:
 
2797
            self.new_keys = None
 
2798
 
 
2799
    def clear(self):
 
2800
        if self.refs:
 
2801
            self.refs.clear()
 
2802
        if self.new_keys:
 
2803
            self.new_keys.clear()
 
2804
 
 
2805
    def add_references(self, key, refs):
 
2806
        # Record the new references
 
2807
        for referenced in refs:
 
2808
            try:
 
2809
                needed_by = self.refs[referenced]
 
2810
            except KeyError:
 
2811
                needed_by = self.refs[referenced] = set()
 
2812
            needed_by.add(key)
 
2813
        # Discard references satisfied by the new key
 
2814
        self.add_key(key)
 
2815
 
 
2816
    def get_new_keys(self):
 
2817
        return self.new_keys
 
2818
    
 
2819
    def get_unsatisfied_refs(self):
 
2820
        return self.refs.iterkeys()
 
2821
 
 
2822
    def _satisfy_refs_for_key(self, key):
 
2823
        try:
 
2824
            del self.refs[key]
 
2825
        except KeyError:
 
2826
            # No keys depended on this key.  That's ok.
 
2827
            pass
 
2828
 
 
2829
    def add_key(self, key):
 
2830
        # satisfy refs for key, and remember that we've seen this key.
 
2831
        self._satisfy_refs_for_key(key)
 
2832
        if self.new_keys is not None:
 
2833
            self.new_keys.add(key)
 
2834
 
 
2835
    def satisfy_refs_for_keys(self, keys):
 
2836
        for key in keys:
 
2837
            self._satisfy_refs_for_key(key)
 
2838
 
 
2839
    def get_referrers(self):
 
2840
        result = set()
 
2841
        for referrers in self.refs.itervalues():
 
2842
            result.update(referrers)
 
2843
        return result
 
2844
 
 
2845
 
2169
2846
class _KnitGraphIndex(object):
2170
2847
    """A KnitVersionedFiles index layered on GraphIndex."""
2171
2848
 
2172
2849
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2173
 
        add_callback=None):
 
2850
        add_callback=None, track_external_parent_refs=False):
2174
2851
        """Construct a KnitGraphIndex on a graph_index.
2175
2852
 
2176
2853
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
2177
2854
        :param is_locked: A callback to check whether the object should answer
2178
2855
            queries.
2179
2856
        :param deltas: Allow delta-compressed records.
2180
 
        :param parents: If True, record knits parents, if not do not record 
 
2857
        :param parents: If True, record knits parents, if not do not record
2181
2858
            parents.
2182
2859
        :param add_callback: If not None, allow additions to the index and call
2183
2860
            this callback with a list of added GraphIndex nodes:
2184
2861
            [(node, value, node_refs), ...]
2185
2862
        :param is_locked: A callback, returns True if the index is locked and
2186
2863
            thus usable.
 
2864
        :param track_external_parent_refs: If True, record all external parent
 
2865
            references parents from added records.  These can be retrieved
 
2866
            later by calling get_missing_parents().
2187
2867
        """
2188
2868
        self._add_callback = add_callback
2189
2869
        self._graph_index = graph_index
2196
2876
                "parent tracking.")
2197
2877
        self.has_graph = parents
2198
2878
        self._is_locked = is_locked
 
2879
        self._missing_compression_parents = set()
 
2880
        if track_external_parent_refs:
 
2881
            self._key_dependencies = _KeyRefs()
 
2882
        else:
 
2883
            self._key_dependencies = None
2199
2884
 
2200
2885
    def __repr__(self):
2201
2886
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2202
2887
 
2203
 
    def add_records(self, records, random_id=False):
 
2888
    def add_records(self, records, random_id=False,
 
2889
        missing_compression_parents=False):
2204
2890
        """Add multiple records to the index.
2205
 
        
 
2891
 
2206
2892
        This function does not insert data into the Immutable GraphIndex
2207
2893
        backing the KnitGraphIndex, instead it prepares data for insertion by
2208
2894
        the caller and checks that it is safe to insert then calls
2212
2898
                         (key, options, access_memo, parents).
2213
2899
        :param random_id: If True the ids being added were randomly generated
2214
2900
            and no check for existence will be performed.
 
2901
        :param missing_compression_parents: If True the records being added are
 
2902
            only compressed against texts already in the index (or inside
 
2903
            records). If False the records all refer to unavailable texts (or
 
2904
            texts inside records) as compression parents.
2215
2905
        """
2216
2906
        if not self._add_callback:
2217
2907
            raise errors.ReadOnlyError(self)
2219
2909
        # anymore.
2220
2910
 
2221
2911
        keys = {}
 
2912
        compression_parents = set()
 
2913
        key_dependencies = self._key_dependencies
2222
2914
        for (key, options, access_memo, parents) in records:
2223
2915
            if self._parents:
2224
2916
                parents = tuple(parents)
 
2917
                if key_dependencies is not None:
 
2918
                    key_dependencies.add_references(key, parents)
2225
2919
            index, pos, size = access_memo
2226
2920
            if 'no-eol' in options:
2227
2921
                value = 'N'
2235
2929
                if self._deltas:
2236
2930
                    if 'line-delta' in options:
2237
2931
                        node_refs = (parents, (parents[0],))
 
2932
                        if missing_compression_parents:
 
2933
                            compression_parents.add(parents[0])
2238
2934
                    else:
2239
2935
                        node_refs = (parents, ())
2240
2936
                else:
2249
2945
        if not random_id:
2250
2946
            present_nodes = self._get_entries(keys)
2251
2947
            for (index, key, value, node_refs) in present_nodes:
 
2948
                parents = node_refs[:1]
 
2949
                # Sometimes these are passed as a list rather than a tuple
 
2950
                passed = static_tuple.as_tuples(keys[key])
 
2951
                passed_parents = passed[1][:1]
2252
2952
                if (value[0] != keys[key][0][0] or
2253
 
                    node_refs[:1] != keys[key][1][:1]):
 
2953
                    parents != passed_parents):
 
2954
                    node_refs = static_tuple.as_tuples(node_refs)
2254
2955
                    raise KnitCorrupt(self, "inconsistent details in add_records"
2255
 
                        ": %s %s" % ((value, node_refs), keys[key]))
 
2956
                        ": %s %s" % ((value, node_refs), passed))
2256
2957
                del keys[key]
2257
2958
        result = []
2258
2959
        if self._parents:
2262
2963
            for key, (value, node_refs) in keys.iteritems():
2263
2964
                result.append((key, value))
2264
2965
        self._add_callback(result)
2265
 
        
 
2966
        if missing_compression_parents:
 
2967
            # This may appear to be incorrect (it does not check for
 
2968
            # compression parents that are in the existing graph index),
 
2969
            # but such records won't have been buffered, so this is
 
2970
            # actually correct: every entry when
 
2971
            # missing_compression_parents==True either has a missing parent, or
 
2972
            # a parent that is one of the keys in records.
 
2973
            compression_parents.difference_update(keys)
 
2974
            self._missing_compression_parents.update(compression_parents)
 
2975
        # Adding records may have satisfied missing compression parents.
 
2976
        self._missing_compression_parents.difference_update(keys)
 
2977
 
 
2978
    def scan_unvalidated_index(self, graph_index):
 
2979
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2980
 
 
2981
        This allows this _KnitGraphIndex to keep track of any missing
 
2982
        compression parents we may want to have filled in to make those
 
2983
        indices valid.
 
2984
 
 
2985
        :param graph_index: A GraphIndex
 
2986
        """
 
2987
        if self._deltas:
 
2988
            new_missing = graph_index.external_references(ref_list_num=1)
 
2989
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2990
            self._missing_compression_parents.update(new_missing)
 
2991
        if self._key_dependencies is not None:
 
2992
            # Add parent refs from graph_index (and discard parent refs that
 
2993
            # the graph_index has).
 
2994
            for node in graph_index.iter_all_entries():
 
2995
                self._key_dependencies.add_references(node[1], node[3][0])
 
2996
 
 
2997
    def get_missing_compression_parents(self):
 
2998
        """Return the keys of missing compression parents.
 
2999
 
 
3000
        Missing compression parents occur when a record stream was missing
 
3001
        basis texts, or a index was scanned that had missing basis texts.
 
3002
        """
 
3003
        return frozenset(self._missing_compression_parents)
 
3004
 
 
3005
    def get_missing_parents(self):
 
3006
        """Return the keys of missing parents."""
 
3007
        # If updating this, you should also update
 
3008
        # groupcompress._GCGraphIndex.get_missing_parents
 
3009
        # We may have false positives, so filter those out.
 
3010
        self._key_dependencies.satisfy_refs_for_keys(
 
3011
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
3012
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
3013
 
2266
3014
    def _check_read(self):
2267
3015
        """raise if reads are not permitted."""
2268
3016
        if not self._is_locked():
2328
3076
 
2329
3077
    def _get_entries(self, keys, check_present=False):
2330
3078
        """Get the entries for keys.
2331
 
        
 
3079
 
2332
3080
        :param keys: An iterable of index key tuples.
2333
3081
        """
2334
3082
        keys = set(keys)
2376
3124
            options.append('no-eol')
2377
3125
        return options
2378
3126
 
 
3127
    def find_ancestry(self, keys):
 
3128
        """See CombinedGraphIndex.find_ancestry()"""
 
3129
        return self._graph_index.find_ancestry(keys, 0)
 
3130
 
2379
3131
    def get_parent_map(self, keys):
2380
3132
        """Get a map of the parents of keys.
2381
3133
 
2396
3148
 
2397
3149
    def get_position(self, key):
2398
3150
        """Return details needed to access the version.
2399
 
        
 
3151
 
2400
3152
        :return: a tuple (index, data position, size) to hand to the access
2401
3153
            logic to get the record.
2402
3154
        """
2407
3159
 
2408
3160
    def keys(self):
2409
3161
        """Get all the keys in the collection.
2410
 
        
 
3162
 
2411
3163
        The keys are not ordered.
2412
3164
        """
2413
3165
        self._check_read()
2414
3166
        return [node[1] for node in self._graph_index.iter_all_entries()]
2415
 
    
 
3167
 
2416
3168
    missing_keys = _mod_index._missing_keys_from_parent_map
2417
3169
 
2418
3170
    def _node_to_position(self, node):
2440
3192
            return positions[key][1]
2441
3193
        return keys.sort(key=get_index_memo)
2442
3194
 
 
3195
    _get_total_build_size = _get_total_build_size
 
3196
 
2443
3197
 
2444
3198
class _KnitKeyAccess(object):
2445
3199
    """Access to records in .knit files."""
2466
3220
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
2467
3221
            length), where the key is the record key.
2468
3222
        """
2469
 
        if type(raw_data) != str:
 
3223
        if type(raw_data) is not str:
2470
3224
            raise AssertionError(
2471
3225
                'data must be plain bytes was %s' % type(raw_data))
2472
3226
        result = []
2489
3243
            result.append((key, base, size))
2490
3244
        return result
2491
3245
 
 
3246
    def flush(self):
 
3247
        """Flush pending writes on this access object.
 
3248
        
 
3249
        For .knit files this is a no-op.
 
3250
        """
 
3251
        pass
 
3252
 
2492
3253
    def get_raw_records(self, memos_for_retrieval):
2493
3254
        """Get the raw bytes for a records.
2494
3255
 
2519
3280
class _DirectPackAccess(object):
2520
3281
    """Access to data in one or more packs with less translation."""
2521
3282
 
2522
 
    def __init__(self, index_to_packs, reload_func=None):
 
3283
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2523
3284
        """Create a _DirectPackAccess object.
2524
3285
 
2525
3286
        :param index_to_packs: A dict mapping index objects to the transport
2532
3293
        self._write_index = None
2533
3294
        self._indices = index_to_packs
2534
3295
        self._reload_func = reload_func
 
3296
        self._flush_func = flush_func
2535
3297
 
2536
3298
    def add_raw_records(self, key_sizes, raw_data):
2537
3299
        """Add raw knit bytes to a storage area.
2547
3309
            length), where the index field is the write_index object supplied
2548
3310
            to the PackAccess object.
2549
3311
        """
2550
 
        if type(raw_data) != str:
 
3312
        if type(raw_data) is not str:
2551
3313
            raise AssertionError(
2552
3314
                'data must be plain bytes was %s' % type(raw_data))
2553
3315
        result = []
2559
3321
            result.append((self._write_index, p_offset, p_length))
2560
3322
        return result
2561
3323
 
 
3324
    def flush(self):
 
3325
        """Flush pending writes on this access object.
 
3326
 
 
3327
        This will flush any buffered writes to a NewPack.
 
3328
        """
 
3329
        if self._flush_func is not None:
 
3330
            self._flush_func()
 
3331
            
2562
3332
    def get_raw_records(self, memos_for_retrieval):
2563
3333
        """Get the raw bytes for a records.
2564
3334
 
2565
 
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
3335
        :param memos_for_retrieval: An iterable containing the (index, pos,
2566
3336
            length) memo for retrieving the bytes. The Pack access method
2567
3337
            looks up the pack to use for a given record in its index_to_pack
2568
3338
            map.
2658
3428
    recommended.
2659
3429
    """
2660
3430
    annotator = _KnitAnnotator(knit)
2661
 
    return iter(annotator.annotate(revision_id))
2662
 
 
2663
 
 
2664
 
class _KnitAnnotator(object):
 
3431
    return iter(annotator.annotate_flat(revision_id))
 
3432
 
 
3433
 
 
3434
class _KnitAnnotator(annotate.Annotator):
2665
3435
    """Build up the annotations for a text."""
2666
3436
 
2667
 
    def __init__(self, knit):
2668
 
        self._knit = knit
2669
 
 
2670
 
        # Content objects, differs from fulltexts because of how final newlines
2671
 
        # are treated by knits. the content objects here will always have a
2672
 
        # final newline
2673
 
        self._fulltext_contents = {}
2674
 
 
2675
 
        # Annotated lines of specific revisions
2676
 
        self._annotated_lines = {}
2677
 
 
2678
 
        # Track the raw data for nodes that we could not process yet.
2679
 
        # This maps the revision_id of the base to a list of children that will
2680
 
        # annotated from it.
2681
 
        self._pending_children = {}
2682
 
 
2683
 
        # Nodes which cannot be extracted
2684
 
        self._ghosts = set()
2685
 
 
2686
 
        # Track how many children this node has, so we know if we need to keep
2687
 
        # it
2688
 
        self._annotate_children = {}
2689
 
        self._compression_children = {}
 
3437
    def __init__(self, vf):
 
3438
        annotate.Annotator.__init__(self, vf)
 
3439
 
 
3440
        # TODO: handle Nodes which cannot be extracted
 
3441
        # self._ghosts = set()
 
3442
 
 
3443
        # Map from (key, parent_key) => matching_blocks, should be 'use once'
 
3444
        self._matching_blocks = {}
 
3445
 
 
3446
        # KnitContent objects
 
3447
        self._content_objects = {}
 
3448
        # The number of children that depend on this fulltext content object
 
3449
        self._num_compression_children = {}
 
3450
        # Delta records that need their compression parent before they can be
 
3451
        # expanded
 
3452
        self._pending_deltas = {}
 
3453
        # Fulltext records that are waiting for their parents fulltexts before
 
3454
        # they can be yielded for annotation
 
3455
        self._pending_annotation = {}
2690
3456
 
2691
3457
        self._all_build_details = {}
2692
 
        # The children => parent revision_id graph
2693
 
        self._revision_id_graph = {}
2694
 
 
2695
 
        self._heads_provider = None
2696
 
 
2697
 
        self._nodes_to_keep_annotations = set()
2698
 
        self._generations_until_keep = 100
2699
 
 
2700
 
    def set_generations_until_keep(self, value):
2701
 
        """Set the number of generations before caching a node.
2702
 
 
2703
 
        Setting this to -1 will cache every merge node, setting this higher
2704
 
        will cache fewer nodes.
2705
 
        """
2706
 
        self._generations_until_keep = value
2707
 
 
2708
 
    def _add_fulltext_content(self, revision_id, content_obj):
2709
 
        self._fulltext_contents[revision_id] = content_obj
2710
 
        # TODO: jam 20080305 It might be good to check the sha1digest here
2711
 
        return content_obj.text()
2712
 
 
2713
 
    def _check_parents(self, child, nodes_to_annotate):
2714
 
        """Check if all parents have been processed.
2715
 
 
2716
 
        :param child: A tuple of (rev_id, parents, raw_content)
2717
 
        :param nodes_to_annotate: If child is ready, add it to
2718
 
            nodes_to_annotate, otherwise put it back in self._pending_children
2719
 
        """
2720
 
        for parent_id in child[1]:
2721
 
            if (parent_id not in self._annotated_lines):
2722
 
                # This parent is present, but another parent is missing
2723
 
                self._pending_children.setdefault(parent_id,
2724
 
                                                  []).append(child)
2725
 
                break
2726
 
        else:
2727
 
            # This one is ready to be processed
2728
 
            nodes_to_annotate.append(child)
2729
 
 
2730
 
    def _add_annotation(self, revision_id, fulltext, parent_ids,
2731
 
                        left_matching_blocks=None):
2732
 
        """Add an annotation entry.
2733
 
 
2734
 
        All parents should already have been annotated.
2735
 
        :return: A list of children that now have their parents satisfied.
2736
 
        """
2737
 
        a = self._annotated_lines
2738
 
        annotated_parent_lines = [a[p] for p in parent_ids]
2739
 
        annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2740
 
            fulltext, revision_id, left_matching_blocks,
2741
 
            heads_provider=self._get_heads_provider()))
2742
 
        self._annotated_lines[revision_id] = annotated_lines
2743
 
        for p in parent_ids:
2744
 
            ann_children = self._annotate_children[p]
2745
 
            ann_children.remove(revision_id)
2746
 
            if (not ann_children
2747
 
                and p not in self._nodes_to_keep_annotations):
2748
 
                del self._annotated_lines[p]
2749
 
                del self._all_build_details[p]
2750
 
                if p in self._fulltext_contents:
2751
 
                    del self._fulltext_contents[p]
2752
 
        # Now that we've added this one, see if there are any pending
2753
 
        # deltas to be done, certainly this parent is finished
2754
 
        nodes_to_annotate = []
2755
 
        for child in self._pending_children.pop(revision_id, []):
2756
 
            self._check_parents(child, nodes_to_annotate)
2757
 
        return nodes_to_annotate
2758
3458
 
2759
3459
    def _get_build_graph(self, key):
2760
3460
        """Get the graphs for building texts and annotations.
2765
3465
        fulltext.)
2766
3466
 
2767
3467
        :return: A list of (key, index_memo) records, suitable for
2768
 
            passing to read_records_iter to start reading in the raw data fro/
 
3468
            passing to read_records_iter to start reading in the raw data from
2769
3469
            the pack file.
2770
3470
        """
2771
 
        if key in self._annotated_lines:
2772
 
            # Nothing to do
2773
 
            return []
2774
3471
        pending = set([key])
2775
3472
        records = []
2776
 
        generation = 0
2777
 
        kept_generation = 0
 
3473
        ann_keys = set()
 
3474
        self._num_needed_children[key] = 1
2778
3475
        while pending:
2779
3476
            # get all pending nodes
2780
 
            generation += 1
2781
3477
            this_iteration = pending
2782
 
            build_details = self._knit._index.get_build_details(this_iteration)
 
3478
            build_details = self._vf._index.get_build_details(this_iteration)
2783
3479
            self._all_build_details.update(build_details)
2784
 
            # new_nodes = self._knit._index._get_entries(this_iteration)
 
3480
            # new_nodes = self._vf._index._get_entries(this_iteration)
2785
3481
            pending = set()
2786
3482
            for key, details in build_details.iteritems():
2787
 
                (index_memo, compression_parent, parents,
 
3483
                (index_memo, compression_parent, parent_keys,
2788
3484
                 record_details) = details
2789
 
                self._revision_id_graph[key] = parents
 
3485
                self._parent_map[key] = parent_keys
 
3486
                self._heads_provider = None
2790
3487
                records.append((key, index_memo))
2791
3488
                # Do we actually need to check _annotated_lines?
2792
 
                pending.update(p for p in parents
2793
 
                                 if p not in self._all_build_details)
 
3489
                pending.update([p for p in parent_keys
 
3490
                                   if p not in self._all_build_details])
 
3491
                if parent_keys:
 
3492
                    for parent_key in parent_keys:
 
3493
                        if parent_key in self._num_needed_children:
 
3494
                            self._num_needed_children[parent_key] += 1
 
3495
                        else:
 
3496
                            self._num_needed_children[parent_key] = 1
2794
3497
                if compression_parent:
2795
 
                    self._compression_children.setdefault(compression_parent,
2796
 
                        []).append(key)
2797
 
                if parents:
2798
 
                    for parent in parents:
2799
 
                        self._annotate_children.setdefault(parent,
2800
 
                            []).append(key)
2801
 
                    num_gens = generation - kept_generation
2802
 
                    if ((num_gens >= self._generations_until_keep)
2803
 
                        and len(parents) > 1):
2804
 
                        kept_generation = generation
2805
 
                        self._nodes_to_keep_annotations.add(key)
 
3498
                    if compression_parent in self._num_compression_children:
 
3499
                        self._num_compression_children[compression_parent] += 1
 
3500
                    else:
 
3501
                        self._num_compression_children[compression_parent] = 1
2806
3502
 
2807
3503
            missing_versions = this_iteration.difference(build_details.keys())
2808
 
            self._ghosts.update(missing_versions)
2809
 
            for missing_version in missing_versions:
2810
 
                # add a key, no parents
2811
 
                self._revision_id_graph[missing_version] = ()
2812
 
                pending.discard(missing_version) # don't look for it
2813
 
        if self._ghosts.intersection(self._compression_children):
2814
 
            raise KnitCorrupt(
2815
 
                "We cannot have nodes which have a ghost compression parent:\n"
2816
 
                "ghosts: %r\n"
2817
 
                "compression children: %r"
2818
 
                % (self._ghosts, self._compression_children))
2819
 
        # Cleanout anything that depends on a ghost so that we don't wait for
2820
 
        # the ghost to show up
2821
 
        for node in self._ghosts:
2822
 
            if node in self._annotate_children:
2823
 
                # We won't be building this node
2824
 
                del self._annotate_children[node]
 
3504
            if missing_versions:
 
3505
                for key in missing_versions:
 
3506
                    if key in self._parent_map and key in self._text_cache:
 
3507
                        # We already have this text ready, we just need to
 
3508
                        # yield it later so we get it annotated
 
3509
                        ann_keys.add(key)
 
3510
                        parent_keys = self._parent_map[key]
 
3511
                        for parent_key in parent_keys:
 
3512
                            if parent_key in self._num_needed_children:
 
3513
                                self._num_needed_children[parent_key] += 1
 
3514
                            else:
 
3515
                                self._num_needed_children[parent_key] = 1
 
3516
                        pending.update([p for p in parent_keys
 
3517
                                           if p not in self._all_build_details])
 
3518
                    else:
 
3519
                        raise errors.RevisionNotPresent(key, self._vf)
2825
3520
        # Generally we will want to read the records in reverse order, because
2826
3521
        # we find the parent nodes after the children
2827
3522
        records.reverse()
2828
 
        return records
2829
 
 
2830
 
    def _annotate_records(self, records):
2831
 
        """Build the annotations for the listed records."""
 
3523
        return records, ann_keys
 
3524
 
 
3525
    def _get_needed_texts(self, key, pb=None):
 
3526
        # if True or len(self._vf._fallback_vfs) > 0:
 
3527
        if len(self._vf._fallback_vfs) > 0:
 
3528
            # If we have fallbacks, go to the generic path
 
3529
            for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
 
3530
                yield v
 
3531
            return
 
3532
        while True:
 
3533
            try:
 
3534
                records, ann_keys = self._get_build_graph(key)
 
3535
                for idx, (sub_key, text, num_lines) in enumerate(
 
3536
                                                self._extract_texts(records)):
 
3537
                    if pb is not None:
 
3538
                        pb.update('annotating', idx, len(records))
 
3539
                    yield sub_key, text, num_lines
 
3540
                for sub_key in ann_keys:
 
3541
                    text = self._text_cache[sub_key]
 
3542
                    num_lines = len(text) # bad assumption
 
3543
                    yield sub_key, text, num_lines
 
3544
                return
 
3545
            except errors.RetryWithNewPacks, e:
 
3546
                self._vf._access.reload_or_raise(e)
 
3547
                # The cached build_details are no longer valid
 
3548
                self._all_build_details.clear()
 
3549
 
 
3550
    def _cache_delta_blocks(self, key, compression_parent, delta, lines):
 
3551
        parent_lines = self._text_cache[compression_parent]
 
3552
        blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
 
3553
        self._matching_blocks[(key, compression_parent)] = blocks
 
3554
 
 
3555
    def _expand_record(self, key, parent_keys, compression_parent, record,
 
3556
                       record_details):
 
3557
        delta = None
 
3558
        if compression_parent:
 
3559
            if compression_parent not in self._content_objects:
 
3560
                # Waiting for the parent
 
3561
                self._pending_deltas.setdefault(compression_parent, []).append(
 
3562
                    (key, parent_keys, record, record_details))
 
3563
                return None
 
3564
            # We have the basis parent, so expand the delta
 
3565
            num = self._num_compression_children[compression_parent]
 
3566
            num -= 1
 
3567
            if num == 0:
 
3568
                base_content = self._content_objects.pop(compression_parent)
 
3569
                self._num_compression_children.pop(compression_parent)
 
3570
            else:
 
3571
                self._num_compression_children[compression_parent] = num
 
3572
                base_content = self._content_objects[compression_parent]
 
3573
            # It is tempting to want to copy_base_content=False for the last
 
3574
            # child object. However, whenever noeol=False,
 
3575
            # self._text_cache[parent_key] is content._lines. So mutating it
 
3576
            # gives very bad results.
 
3577
            # The alternative is to copy the lines into text cache, but then we
 
3578
            # are copying anyway, so just do it here.
 
3579
            content, delta = self._vf._factory.parse_record(
 
3580
                key, record, record_details, base_content,
 
3581
                copy_base_content=True)
 
3582
        else:
 
3583
            # Fulltext record
 
3584
            content, _ = self._vf._factory.parse_record(
 
3585
                key, record, record_details, None)
 
3586
        if self._num_compression_children.get(key, 0) > 0:
 
3587
            self._content_objects[key] = content
 
3588
        lines = content.text()
 
3589
        self._text_cache[key] = lines
 
3590
        if delta is not None:
 
3591
            self._cache_delta_blocks(key, compression_parent, delta, lines)
 
3592
        return lines
 
3593
 
 
3594
    def _get_parent_annotations_and_matches(self, key, text, parent_key):
 
3595
        """Get the list of annotations for the parent, and the matching lines.
 
3596
 
 
3597
        :param text: The opaque value given by _get_needed_texts
 
3598
        :param parent_key: The key for the parent text
 
3599
        :return: (parent_annotations, matching_blocks)
 
3600
            parent_annotations is a list as long as the number of lines in
 
3601
                parent
 
3602
            matching_blocks is a list of (parent_idx, text_idx, len) tuples
 
3603
                indicating which lines match between the two texts
 
3604
        """
 
3605
        block_key = (key, parent_key)
 
3606
        if block_key in self._matching_blocks:
 
3607
            blocks = self._matching_blocks.pop(block_key)
 
3608
            parent_annotations = self._annotations_cache[parent_key]
 
3609
            return parent_annotations, blocks
 
3610
        return annotate.Annotator._get_parent_annotations_and_matches(self,
 
3611
            key, text, parent_key)
 
3612
 
 
3613
    def _process_pending(self, key):
 
3614
        """The content for 'key' was just processed.
 
3615
 
 
3616
        Determine if there is any more pending work to be processed.
 
3617
        """
 
3618
        to_return = []
 
3619
        if key in self._pending_deltas:
 
3620
            compression_parent = key
 
3621
            children = self._pending_deltas.pop(key)
 
3622
            for child_key, parent_keys, record, record_details in children:
 
3623
                lines = self._expand_record(child_key, parent_keys,
 
3624
                                            compression_parent,
 
3625
                                            record, record_details)
 
3626
                if self._check_ready_for_annotations(child_key, parent_keys):
 
3627
                    to_return.append(child_key)
 
3628
        # Also check any children that are waiting for this parent to be
 
3629
        # annotation ready
 
3630
        if key in self._pending_annotation:
 
3631
            children = self._pending_annotation.pop(key)
 
3632
            to_return.extend([c for c, p_keys in children
 
3633
                              if self._check_ready_for_annotations(c, p_keys)])
 
3634
        return to_return
 
3635
 
 
3636
    def _check_ready_for_annotations(self, key, parent_keys):
 
3637
        """return true if this text is ready to be yielded.
 
3638
 
 
3639
        Otherwise, this will return False, and queue the text into
 
3640
        self._pending_annotation
 
3641
        """
 
3642
        for parent_key in parent_keys:
 
3643
            if parent_key not in self._annotations_cache:
 
3644
                # still waiting on at least one parent text, so queue it up
 
3645
                # Note that if there are multiple parents, we need to wait
 
3646
                # for all of them.
 
3647
                self._pending_annotation.setdefault(parent_key,
 
3648
                    []).append((key, parent_keys))
 
3649
                return False
 
3650
        return True
 
3651
 
 
3652
    def _extract_texts(self, records):
 
3653
        """Extract the various texts needed based on records"""
2832
3654
        # We iterate in the order read, rather than a strict order requested
2833
3655
        # However, process what we can, and put off to the side things that
2834
3656
        # still need parents, cleaning them up when those parents are
2835
3657
        # processed.
2836
 
        for (rev_id, record,
2837
 
             digest) in self._knit._read_records_iter(records):
2838
 
            if rev_id in self._annotated_lines:
 
3658
        # Basic data flow:
 
3659
        #   1) As 'records' are read, see if we can expand these records into
 
3660
        #      Content objects (and thus lines)
 
3661
        #   2) If a given line-delta is waiting on its compression parent, it
 
3662
        #      gets queued up into self._pending_deltas, otherwise we expand
 
3663
        #      it, and put it into self._text_cache and self._content_objects
 
3664
        #   3) If we expanded the text, we will then check to see if all
 
3665
        #      parents have also been processed. If so, this text gets yielded,
 
3666
        #      else this record gets set aside into pending_annotation
 
3667
        #   4) Further, if we expanded the text in (2), we will then check to
 
3668
        #      see if there are any children in self._pending_deltas waiting to
 
3669
        #      also be processed. If so, we go back to (2) for those
 
3670
        #   5) Further again, if we yielded the text, we can then check if that
 
3671
        #      'unlocks' any of the texts in pending_annotations, which should
 
3672
        #      then get yielded as well
 
3673
        # Note that both steps 4 and 5 are 'recursive' in that unlocking one
 
3674
        # compression child could unlock yet another, and yielding a fulltext
 
3675
        # will also 'unlock' the children that are waiting on that annotation.
 
3676
        # (Though also, unlocking 1 parent's fulltext, does not unlock a child
 
3677
        # if other parents are also waiting.)
 
3678
        # We want to yield content before expanding child content objects, so
 
3679
        # that we know when we can re-use the content lines, and the annotation
 
3680
        # code can know when it can stop caching fulltexts, as well.
 
3681
 
 
3682
        # Children that are missing their compression parent
 
3683
        pending_deltas = {}
 
3684
        for (key, record, digest) in self._vf._read_records_iter(records):
 
3685
            # ghosts?
 
3686
            details = self._all_build_details[key]
 
3687
            (_, compression_parent, parent_keys, record_details) = details
 
3688
            lines = self._expand_record(key, parent_keys, compression_parent,
 
3689
                                        record, record_details)
 
3690
            if lines is None:
 
3691
                # Pending delta should be queued up
2839
3692
                continue
2840
 
            parent_ids = self._revision_id_graph[rev_id]
2841
 
            parent_ids = [p for p in parent_ids if p not in self._ghosts]
2842
 
            details = self._all_build_details[rev_id]
2843
 
            (index_memo, compression_parent, parents,
2844
 
             record_details) = details
2845
 
            nodes_to_annotate = []
2846
 
            # TODO: Remove the punning between compression parents, and
2847
 
            #       parent_ids, we should be able to do this without assuming
2848
 
            #       the build order
2849
 
            if len(parent_ids) == 0:
2850
 
                # There are no parents for this node, so just add it
2851
 
                # TODO: This probably needs to be decoupled
2852
 
                fulltext_content, delta = self._knit._factory.parse_record(
2853
 
                    rev_id, record, record_details, None)
2854
 
                fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2855
 
                nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2856
 
                    parent_ids, left_matching_blocks=None))
2857
 
            else:
2858
 
                child = (rev_id, parent_ids, record)
2859
 
                # Check if all the parents are present
2860
 
                self._check_parents(child, nodes_to_annotate)
2861
 
            while nodes_to_annotate:
2862
 
                # Should we use a queue here instead of a stack?
2863
 
                (rev_id, parent_ids, record) = nodes_to_annotate.pop()
2864
 
                (index_memo, compression_parent, parents,
2865
 
                 record_details) = self._all_build_details[rev_id]
2866
 
                blocks = None
2867
 
                if compression_parent is not None:
2868
 
                    comp_children = self._compression_children[compression_parent]
2869
 
                    if rev_id not in comp_children:
2870
 
                        raise AssertionError("%r not in compression children %r"
2871
 
                            % (rev_id, comp_children))
2872
 
                    # If there is only 1 child, it is safe to reuse this
2873
 
                    # content
2874
 
                    reuse_content = (len(comp_children) == 1
2875
 
                        and compression_parent not in
2876
 
                            self._nodes_to_keep_annotations)
2877
 
                    if reuse_content:
2878
 
                        # Remove it from the cache since it will be changing
2879
 
                        parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2880
 
                        # Make sure to copy the fulltext since it might be
2881
 
                        # modified
2882
 
                        parent_fulltext = list(parent_fulltext_content.text())
2883
 
                    else:
2884
 
                        parent_fulltext_content = self._fulltext_contents[compression_parent]
2885
 
                        parent_fulltext = parent_fulltext_content.text()
2886
 
                    comp_children.remove(rev_id)
2887
 
                    fulltext_content, delta = self._knit._factory.parse_record(
2888
 
                        rev_id, record, record_details,
2889
 
                        parent_fulltext_content,
2890
 
                        copy_base_content=(not reuse_content))
2891
 
                    fulltext = self._add_fulltext_content(rev_id,
2892
 
                                                          fulltext_content)
2893
 
                    if compression_parent == parent_ids[0]:
2894
 
                        # the compression_parent is the left parent, so we can
2895
 
                        # re-use the delta
2896
 
                        blocks = KnitContent.get_line_delta_blocks(delta,
2897
 
                                parent_fulltext, fulltext)
2898
 
                else:
2899
 
                    fulltext_content = self._knit._factory.parse_fulltext(
2900
 
                        record, rev_id)
2901
 
                    fulltext = self._add_fulltext_content(rev_id,
2902
 
                        fulltext_content)
2903
 
                nodes_to_annotate.extend(
2904
 
                    self._add_annotation(rev_id, fulltext, parent_ids,
2905
 
                                     left_matching_blocks=blocks))
2906
 
 
2907
 
    def _get_heads_provider(self):
2908
 
        """Create a heads provider for resolving ancestry issues."""
2909
 
        if self._heads_provider is not None:
2910
 
            return self._heads_provider
2911
 
        parent_provider = _mod_graph.DictParentsProvider(
2912
 
            self._revision_id_graph)
2913
 
        graph_obj = _mod_graph.Graph(parent_provider)
2914
 
        head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2915
 
        self._heads_provider = head_cache
2916
 
        return head_cache
2917
 
 
2918
 
    def annotate(self, key):
2919
 
        """Return the annotated fulltext at the given key.
2920
 
 
2921
 
        :param key: The key to annotate.
2922
 
        """
2923
 
        if len(self._knit._fallback_vfs) > 0:
2924
 
            # stacked knits can't use the fast path at present.
2925
 
            return self._simple_annotate(key)
2926
 
        while True:
2927
 
            try:
2928
 
                records = self._get_build_graph(key)
2929
 
                if key in self._ghosts:
2930
 
                    raise errors.RevisionNotPresent(key, self._knit)
2931
 
                self._annotate_records(records)
2932
 
                return self._annotated_lines[key]
2933
 
            except errors.RetryWithNewPacks, e:
2934
 
                self._knit._access.reload_or_raise(e)
2935
 
                # The cached build_details are no longer valid
2936
 
                self._all_build_details.clear()
2937
 
 
2938
 
    def _simple_annotate(self, key):
2939
 
        """Return annotated fulltext, rediffing from the full texts.
2940
 
 
2941
 
        This is slow but makes no assumptions about the repository
2942
 
        being able to produce line deltas.
2943
 
        """
2944
 
        # TODO: this code generates a parent maps of present ancestors; it
2945
 
        # could be split out into a separate method, and probably should use
2946
 
        # iter_ancestry instead. -- mbp and robertc 20080704
2947
 
        graph = _mod_graph.Graph(self._knit)
2948
 
        head_cache = _mod_graph.FrozenHeadsCache(graph)
2949
 
        search = graph._make_breadth_first_searcher([key])
2950
 
        keys = set()
2951
 
        while True:
2952
 
            try:
2953
 
                present, ghosts = search.next_with_ghosts()
2954
 
            except StopIteration:
2955
 
                break
2956
 
            keys.update(present)
2957
 
        parent_map = self._knit.get_parent_map(keys)
2958
 
        parent_cache = {}
2959
 
        reannotate = annotate.reannotate
2960
 
        for record in self._knit.get_record_stream(keys, 'topological', True):
2961
 
            key = record.key
2962
 
            fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2963
 
            parents = parent_map[key]
2964
 
            if parents is not None:
2965
 
                parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2966
 
            else:
2967
 
                parent_lines = []
2968
 
            parent_cache[key] = list(
2969
 
                reannotate(parent_lines, fulltext, key, None, head_cache))
2970
 
        try:
2971
 
            return parent_cache[key]
2972
 
        except KeyError, e:
2973
 
            raise errors.RevisionNotPresent(key, self._knit)
2974
 
 
 
3693
            # At this point, we may be able to yield this content, if all
 
3694
            # parents are also finished
 
3695
            yield_this_text = self._check_ready_for_annotations(key,
 
3696
                                                                parent_keys)
 
3697
            if yield_this_text:
 
3698
                # All parents present
 
3699
                yield key, lines, len(lines)
 
3700
            to_process = self._process_pending(key)
 
3701
            while to_process:
 
3702
                this_process = to_process
 
3703
                to_process = []
 
3704
                for key in this_process:
 
3705
                    lines = self._text_cache[key]
 
3706
                    yield key, lines, len(lines)
 
3707
                    to_process.extend(self._process_pending(key))
2975
3708
 
2976
3709
try:
2977
 
    from bzrlib._knit_load_data_c import _load_data_c as _load_data
2978
 
except ImportError:
 
3710
    from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
 
3711
except ImportError, e:
 
3712
    osutils.failed_to_load_extension(e)
2979
3713
    from bzrlib._knit_load_data_py import _load_data_py as _load_data