/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Martin Pool
  • Date: 2009-03-03 03:01:49 UTC
  • mfrom: (4070 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4073.
  • Revision ID: mbp@sourcefrog.net-20090303030149-8p8o8hszdtqa7w8f
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
updates.
21
21
 
22
22
Knit file layout:
23
 
lifeless: the data file is made up of "delta records".  each delta record has a delta header 
24
 
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of 
25
 
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a 
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
26
26
end-marker; simply "end VERSION"
27
27
 
28
28
delta can be line or full contents.a
35
35
130,130,2
36
36
8         if elt.get('executable') == 'yes':
37
37
8             ie.executable = True
38
 
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
39
39
 
40
40
 
41
41
whats in an index:
51
51
 
52
52
"""
53
53
 
54
 
# TODOS:
55
 
# 10:16 < lifeless> make partial index writes safe
56
 
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
 
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
58
 
#                    always' approach.
59
 
# move sha1 out of the content so that join is faster at verifying parents
60
 
# record content length ?
61
 
                  
62
54
 
63
55
from cStringIO import StringIO
64
56
from itertools import izip, chain
131
123
 
132
124
DATA_SUFFIX = '.knit'
133
125
INDEX_SUFFIX = '.kndx'
 
126
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
134
127
 
135
128
 
136
129
class KnitAdapter(object):
138
131
 
139
132
    def __init__(self, basis_vf):
140
133
        """Create an adapter which accesses full texts from basis_vf.
141
 
        
 
134
 
142
135
        :param basis_vf: A versioned file to access basis texts of deltas from.
143
136
            May be None for adapters that do not need to access basis texts.
144
137
        """
151
144
class FTAnnotatedToUnannotated(KnitAdapter):
152
145
    """An adapter from FT annotated knits to unannotated ones."""
153
146
 
154
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
147
    def get_bytes(self, factory):
 
148
        annotated_compressed_bytes = factory._raw_record
155
149
        rec, contents = \
156
150
            self._data._parse_record_unchecked(annotated_compressed_bytes)
157
151
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
162
156
class DeltaAnnotatedToUnannotated(KnitAdapter):
163
157
    """An adapter for deltas from annotated to unannotated."""
164
158
 
165
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
159
    def get_bytes(self, factory):
 
160
        annotated_compressed_bytes = factory._raw_record
166
161
        rec, contents = \
167
162
            self._data._parse_record_unchecked(annotated_compressed_bytes)
168
163
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
175
170
class FTAnnotatedToFullText(KnitAdapter):
176
171
    """An adapter from FT annotated knits to unannotated ones."""
177
172
 
178
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
173
    def get_bytes(self, factory):
 
174
        annotated_compressed_bytes = factory._raw_record
179
175
        rec, contents = \
180
176
            self._data._parse_record_unchecked(annotated_compressed_bytes)
181
177
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
186
182
class DeltaAnnotatedToFullText(KnitAdapter):
187
183
    """An adapter for deltas from annotated to unannotated."""
188
184
 
189
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
185
    def get_bytes(self, factory):
 
186
        annotated_compressed_bytes = factory._raw_record
190
187
        rec, contents = \
191
188
            self._data._parse_record_unchecked(annotated_compressed_bytes)
192
189
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
209
206
class FTPlainToFullText(KnitAdapter):
210
207
    """An adapter from FT plain knits to unannotated ones."""
211
208
 
212
 
    def get_bytes(self, factory, compressed_bytes):
 
209
    def get_bytes(self, factory):
 
210
        compressed_bytes = factory._raw_record
213
211
        rec, contents = \
214
212
            self._data._parse_record_unchecked(compressed_bytes)
215
213
        content, delta = self._plain_factory.parse_record(factory.key[-1],
220
218
class DeltaPlainToFullText(KnitAdapter):
221
219
    """An adapter for deltas from annotated to unannotated."""
222
220
 
223
 
    def get_bytes(self, factory, compressed_bytes):
 
221
    def get_bytes(self, factory):
 
222
        compressed_bytes = factory._raw_record
224
223
        rec, contents = \
225
224
            self._data._parse_record_unchecked(compressed_bytes)
226
225
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
242
241
 
243
242
class KnitContentFactory(ContentFactory):
244
243
    """Content factory for streaming from knits.
245
 
    
 
244
 
246
245
    :seealso ContentFactory:
247
246
    """
248
247
 
249
248
    def __init__(self, key, parents, build_details, sha1, raw_record,
250
 
        annotated, knit=None):
 
249
        annotated, knit=None, network_bytes=None):
251
250
        """Create a KnitContentFactory for key.
252
 
        
 
251
 
253
252
        :param key: The key.
254
253
        :param parents: The parents.
255
254
        :param build_details: The build details as returned from
257
256
        :param sha1: The sha1 expected from the full text of this object.
258
257
        :param raw_record: The bytes of the knit data from disk.
259
258
        :param annotated: True if the raw data is annotated.
 
259
        :param network_bytes: None to calculate the network bytes on demand,
 
260
            not-none if they are already known.
260
261
        """
261
262
        ContentFactory.__init__(self)
262
263
        self.sha1 = sha1
272
273
            annotated_kind = ''
273
274
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
274
275
        self._raw_record = raw_record
 
276
        self._network_bytes = network_bytes
275
277
        self._build_details = build_details
276
278
        self._knit = knit
277
279
 
 
280
    def _create_network_bytes(self):
 
281
        """Create a fully serialised network version for transmission."""
 
282
        # storage_kind, key, parents, Noeol, raw_record
 
283
        key_bytes = '\x00'.join(self.key)
 
284
        if self.parents is None:
 
285
            parent_bytes = 'None:'
 
286
        else:
 
287
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
288
        if self._build_details[1]:
 
289
            noeol = 'N'
 
290
        else:
 
291
            noeol = ' '
 
292
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
293
            parent_bytes, noeol, self._raw_record)
 
294
        self._network_bytes = network_bytes
 
295
 
278
296
    def get_bytes_as(self, storage_kind):
279
297
        if storage_kind == self.storage_kind:
280
 
            return self._raw_record
 
298
            if self._network_bytes is None:
 
299
                self._create_network_bytes()
 
300
            return self._network_bytes
281
301
        if self._knit is not None:
282
302
            if storage_kind == 'chunked':
283
303
                return self._knit.get_lines(self.key[0])
287
307
            self.storage_kind)
288
308
 
289
309
 
 
310
class LazyKnitContentFactory(ContentFactory):
 
311
    """A ContentFactory which can either generate full text or a wire form.
 
312
 
 
313
    :seealso ContentFactory:
 
314
    """
 
315
 
 
316
    def __init__(self, key, parents, generator, first):
 
317
        """Create a LazyKnitContentFactory.
 
318
 
 
319
        :param key: The key of the record.
 
320
        :param parents: The parents of the record.
 
321
        :param generator: A _ContentMapGenerator containing the record for this
 
322
            key.
 
323
        :param first: Is this the first content object returned from generator?
 
324
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
325
            knit-delta-closure-ref
 
326
        """
 
327
        self.key = key
 
328
        self.parents = parents
 
329
        self.sha1 = None
 
330
        self._generator = generator
 
331
        self.storage_kind = "knit-delta-closure"
 
332
        if not first:
 
333
            self.storage_kind = self.storage_kind + "-ref"
 
334
        self._first = first
 
335
 
 
336
    def get_bytes_as(self, storage_kind):
 
337
        if storage_kind == self.storage_kind:
 
338
            if self._first:
 
339
                return self._generator._wire_bytes()
 
340
            else:
 
341
                # all the keys etc are contained in the bytes returned in the
 
342
                # first record.
 
343
                return ''
 
344
        if storage_kind in ('chunked', 'fulltext'):
 
345
            chunks = self._generator._get_one_work(self.key).text()
 
346
            if storage_kind == 'chunked':
 
347
                return chunks
 
348
            else:
 
349
                return ''.join(chunks)
 
350
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
351
            self.storage_kind)
 
352
 
 
353
 
 
354
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
355
    """Convert a network record to a iterator over stream records.
 
356
 
 
357
    :param storage_kind: The storage kind of the record.
 
358
        Must be 'knit-delta-closure'.
 
359
    :param bytes: The bytes of the record on the network.
 
360
    """
 
361
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
362
    return generator.get_record_stream()
 
363
 
 
364
 
 
365
def knit_network_to_record(storage_kind, bytes, line_end):
 
366
    """Convert a network record to a record object.
 
367
 
 
368
    :param storage_kind: The storage kind of the record.
 
369
    :param bytes: The bytes of the record on the network.
 
370
    """
 
371
    start = line_end
 
372
    line_end = bytes.find('\n', start)
 
373
    key = tuple(bytes[start:line_end].split('\x00'))
 
374
    start = line_end + 1
 
375
    line_end = bytes.find('\n', start)
 
376
    parent_line = bytes[start:line_end]
 
377
    if parent_line == 'None:':
 
378
        parents = None
 
379
    else:
 
380
        parents = tuple(
 
381
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
382
             if segment])
 
383
    start = line_end + 1
 
384
    noeol = bytes[start] == 'N'
 
385
    if 'ft' in storage_kind:
 
386
        method = 'fulltext'
 
387
    else:
 
388
        method = 'line-delta'
 
389
    build_details = (method, noeol)
 
390
    start = start + 1
 
391
    raw_record = bytes[start:]
 
392
    annotated = 'annotated' in storage_kind
 
393
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
394
        annotated, network_bytes=bytes)]
 
395
 
 
396
 
290
397
class KnitContent(object):
291
398
    """Content of a knit version to which deltas can be applied.
292
 
    
 
399
 
293
400
    This is always stored in memory as a list of lines with \n at the end,
294
 
    plus a flag saying if the final ending is really there or not, because that 
 
401
    plus a flag saying if the final ending is really there or not, because that
295
402
    corresponds to the on-disk knit representation.
296
403
    """
297
404
 
386
493
 
387
494
class PlainKnitContent(KnitContent):
388
495
    """Unannotated content.
389
 
    
 
496
 
390
497
    When annotate[_iter] is called on this content, the same version is reported
391
498
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
392
499
    objects.
647
754
 
648
755
    This is only functional enough to run interface tests, it doesn't try to
649
756
    provide a full pack environment.
650
 
    
 
757
 
651
758
    :param annotated: knit annotations are wanted.
652
759
    :param mapper: The mapper from keys to paths.
653
760
    """
663
770
 
664
771
    This is only functional enough to run interface tests, it doesn't try to
665
772
    provide a full pack environment.
666
 
    
 
773
 
667
774
    :param graph: Store a graph.
668
775
    :param delta: Delta compress contents.
669
776
    :param keylength: How long should keys be.
700
807
    versioned_files.writer.end()
701
808
 
702
809
 
 
810
def _get_total_build_size(self, keys, positions):
 
811
    """Determine the total bytes to build these keys.
 
812
 
 
813
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
814
    don't inherit from a common base.)
 
815
 
 
816
    :param keys: Keys that we want to build
 
817
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
818
        as returned by _get_components_positions)
 
819
    :return: Number of bytes to build those keys
 
820
    """
 
821
    all_build_index_memos = {}
 
822
    build_keys = keys
 
823
    while build_keys:
 
824
        next_keys = set()
 
825
        for key in build_keys:
 
826
            # This is mostly for the 'stacked' case
 
827
            # Where we will be getting the data from a fallback
 
828
            if key not in positions:
 
829
                continue
 
830
            _, index_memo, compression_parent = positions[key]
 
831
            all_build_index_memos[key] = index_memo
 
832
            if compression_parent not in all_build_index_memos:
 
833
                next_keys.add(compression_parent)
 
834
        build_keys = next_keys
 
835
    return sum([index_memo[2] for index_memo
 
836
                in all_build_index_memos.itervalues()])
 
837
 
 
838
 
703
839
class KnitVersionedFiles(VersionedFiles):
704
840
    """Storage for many versioned files using knit compression.
705
841
 
706
842
    Backend storage is managed by indices and data objects.
707
843
 
708
 
    :ivar _index: A _KnitGraphIndex or similar that can describe the 
709
 
        parents, graph, compression and data location of entries in this 
710
 
        KnitVersionedFiles.  Note that this is only the index for 
 
844
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
845
        parents, graph, compression and data location of entries in this
 
846
        KnitVersionedFiles.  Note that this is only the index for
711
847
        *this* vfs; if there are fallbacks they must be queried separately.
712
848
    """
713
849
 
891
1027
 
892
1028
    def _check_header_version(self, rec, version_id):
893
1029
        """Checks the header version on original format knit records.
894
 
        
 
1030
 
895
1031
        These have the last component of the key embedded in the record.
896
1032
        """
897
1033
        if rec[1] != version_id:
976
1112
            if missing and not allow_missing:
977
1113
                raise errors.RevisionNotPresent(missing.pop(), self)
978
1114
        return component_data
979
 
       
 
1115
 
980
1116
    def _get_content(self, key, parent_texts={}):
981
1117
        """Returns a content object that makes up the specified
982
1118
        version."""
986
1122
            if not self.get_parent_map([key]):
987
1123
                raise RevisionNotPresent(key, self)
988
1124
            return cached_version
989
 
        text_map, contents_map = self._get_content_maps([key])
990
 
        return contents_map[key]
991
 
 
992
 
    def _get_content_maps(self, keys, nonlocal_keys=None):
993
 
        """Produce maps of text and KnitContents
994
 
        
995
 
        :param keys: The keys to produce content maps for.
996
 
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
997
 
            which are known to not be in this knit, but rather in one of the
998
 
            fallback knits.
999
 
        :return: (text_map, content_map) where text_map contains the texts for
1000
 
            the requested versions and content_map contains the KnitContents.
1001
 
        """
1002
 
        # FUTURE: This function could be improved for the 'extract many' case
1003
 
        # by tracking each component and only doing the copy when the number of
1004
 
        # children than need to apply delta's to it is > 1 or it is part of the
1005
 
        # final output.
1006
 
        keys = list(keys)
1007
 
        multiple_versions = len(keys) != 1
1008
 
        record_map = self._get_record_map(keys, allow_missing=True)
1009
 
 
1010
 
        text_map = {}
1011
 
        content_map = {}
1012
 
        final_content = {}
1013
 
        if nonlocal_keys is None:
1014
 
            nonlocal_keys = set()
1015
 
        else:
1016
 
            nonlocal_keys = frozenset(nonlocal_keys)
1017
 
        missing_keys = set(nonlocal_keys)
1018
 
        for source in self._fallback_vfs:
1019
 
            if not missing_keys:
1020
 
                break
1021
 
            for record in source.get_record_stream(missing_keys,
1022
 
                'unordered', True):
1023
 
                if record.storage_kind == 'absent':
1024
 
                    continue
1025
 
                missing_keys.remove(record.key)
1026
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
 
                text_map[record.key] = lines
1028
 
                content_map[record.key] = PlainKnitContent(lines, record.key)
1029
 
                if record.key in keys:
1030
 
                    final_content[record.key] = content_map[record.key]
1031
 
        for key in keys:
1032
 
            if key in nonlocal_keys:
1033
 
                # already handled
1034
 
                continue
1035
 
            components = []
1036
 
            cursor = key
1037
 
            while cursor is not None:
1038
 
                try:
1039
 
                    record, record_details, digest, next = record_map[cursor]
1040
 
                except KeyError:
1041
 
                    raise RevisionNotPresent(cursor, self)
1042
 
                components.append((cursor, record, record_details, digest))
1043
 
                cursor = next
1044
 
                if cursor in content_map:
1045
 
                    # no need to plan further back
1046
 
                    components.append((cursor, None, None, None))
1047
 
                    break
1048
 
 
1049
 
            content = None
1050
 
            for (component_id, record, record_details,
1051
 
                 digest) in reversed(components):
1052
 
                if component_id in content_map:
1053
 
                    content = content_map[component_id]
1054
 
                else:
1055
 
                    content, delta = self._factory.parse_record(key[-1],
1056
 
                        record, record_details, content,
1057
 
                        copy_base_content=multiple_versions)
1058
 
                    if multiple_versions:
1059
 
                        content_map[component_id] = content
1060
 
 
1061
 
            final_content[key] = content
1062
 
 
1063
 
            # digest here is the digest from the last applied component.
1064
 
            text = content.text()
1065
 
            actual_sha = sha_strings(text)
1066
 
            if actual_sha != digest:
1067
 
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1068
 
            text_map[key] = text
1069
 
        return text_map, final_content
 
1125
        generator = _VFContentMapGenerator(self, [key])
 
1126
        return generator._get_content(key)
1070
1127
 
1071
1128
    def get_parent_map(self, keys):
1072
1129
        """Get a map of the graph parents of keys.
1102
1159
 
1103
1160
    def _get_record_map(self, keys, allow_missing=False):
1104
1161
        """Produce a dictionary of knit records.
1105
 
        
 
1162
 
1106
1163
        :return: {key:(record, record_details, digest, next)}
1107
1164
            record
1108
 
                data returned from read_records
 
1165
                data returned from read_records (a KnitContentobject)
1109
1166
            record_details
1110
1167
                opaque information to pass to parse_record
1111
1168
            digest
1114
1171
                build-parent of the version, i.e. the leftmost ancestor.
1115
1172
                Will be None if the record is not a delta.
1116
1173
        :param keys: The keys to build a map for
1117
 
        :param allow_missing: If some records are missing, rather than 
 
1174
        :param allow_missing: If some records are missing, rather than
1118
1175
            error, just return the data that could be generated.
1119
1176
        """
 
1177
        raw_map = self._get_record_map_unparsed(keys,
 
1178
            allow_missing=allow_missing)
 
1179
        return self._raw_map_to_record_map(raw_map)
 
1180
 
 
1181
    def _raw_map_to_record_map(self, raw_map):
 
1182
        """Parse the contents of _get_record_map_unparsed.
 
1183
 
 
1184
        :return: see _get_record_map.
 
1185
        """
 
1186
        result = {}
 
1187
        for key in raw_map:
 
1188
            data, record_details, next = raw_map[key]
 
1189
            content, digest = self._parse_record(key[-1], data)
 
1190
            result[key] = content, record_details, digest, next
 
1191
        return result
 
1192
 
 
1193
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1194
        """Get the raw data for reconstructing keys without parsing it.
 
1195
 
 
1196
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1197
            key-> raw_bytes, (method, noeol), compression_parent
 
1198
        """
1120
1199
        # This retries the whole request if anything fails. Potentially we
1121
1200
        # could be a bit more selective. We could track the keys whose records
1122
1201
        # we have successfully found, and then only request the new records
1132
1211
                # n = next
1133
1212
                records = [(key, i_m) for key, (r, i_m, n)
1134
1213
                                       in position_map.iteritems()]
1135
 
                record_map = {}
1136
 
                for key, record, digest in self._read_records_iter(records):
 
1214
                # Sort by the index memo, so that we request records from the
 
1215
                # same pack file together, and in forward-sorted order
 
1216
                records.sort(key=operator.itemgetter(1))
 
1217
                raw_record_map = {}
 
1218
                for key, data in self._read_records_iter_unchecked(records):
1137
1219
                    (record_details, index_memo, next) = position_map[key]
1138
 
                    record_map[key] = record, record_details, digest, next
1139
 
                return record_map
 
1220
                    raw_record_map[key] = data, record_details, next
 
1221
                return raw_record_map
1140
1222
            except errors.RetryWithNewPacks, e:
1141
1223
                self._access.reload_or_raise(e)
1142
1224
 
1143
 
    def _split_by_prefix(self, keys):
 
1225
    @classmethod
 
1226
    def _split_by_prefix(cls, keys):
1144
1227
        """For the given keys, split them up based on their prefix.
1145
1228
 
1146
1229
        To keep memory pressure somewhat under control, split the
1149
1232
        This should be revisited if _get_content_maps() can ever cross
1150
1233
        file-id boundaries.
1151
1234
 
 
1235
        The keys for a given file_id are kept in the same relative order.
 
1236
        Ordering between file_ids is not, though prefix_order will return the
 
1237
        order that the key was first seen.
 
1238
 
1152
1239
        :param keys: An iterable of key tuples
1153
 
        :return: A dict of {prefix: [key_list]}
 
1240
        :return: (split_map, prefix_order)
 
1241
            split_map       A dictionary mapping prefix => keys
 
1242
            prefix_order    The order that we saw the various prefixes
1154
1243
        """
1155
1244
        split_by_prefix = {}
 
1245
        prefix_order = []
1156
1246
        for key in keys:
1157
1247
            if len(key) == 1:
1158
 
                split_by_prefix.setdefault('', []).append(key)
1159
 
            else:
1160
 
                split_by_prefix.setdefault(key[0], []).append(key)
1161
 
        return split_by_prefix
 
1248
                prefix = ''
 
1249
            else:
 
1250
                prefix = key[0]
 
1251
 
 
1252
            if prefix in split_by_prefix:
 
1253
                split_by_prefix[prefix].append(key)
 
1254
            else:
 
1255
                split_by_prefix[prefix] = [key]
 
1256
                prefix_order.append(prefix)
 
1257
        return split_by_prefix, prefix_order
 
1258
 
 
1259
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1260
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1261
        """For the given keys, group them into 'best-sized' requests.
 
1262
 
 
1263
        The idea is to avoid making 1 request per file, but to never try to
 
1264
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1265
        possible, we should try to group requests to the same pack file
 
1266
        together.
 
1267
 
 
1268
        :return: list of (keys, non_local) tuples that indicate what keys
 
1269
            should be fetched next.
 
1270
        """
 
1271
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1272
        #       from the same pack file together, and we want to extract all
 
1273
        #       the texts for a given build-chain together. Ultimately it
 
1274
        #       probably needs a better global view.
 
1275
        total_keys = len(keys)
 
1276
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1277
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1278
        cur_keys = []
 
1279
        cur_non_local = set()
 
1280
        cur_size = 0
 
1281
        result = []
 
1282
        sizes = []
 
1283
        for prefix in prefix_order:
 
1284
            keys = prefix_split_keys[prefix]
 
1285
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1286
 
 
1287
            this_size = self._index._get_total_build_size(keys, positions)
 
1288
            cur_size += this_size
 
1289
            cur_keys.extend(keys)
 
1290
            cur_non_local.update(non_local)
 
1291
            if cur_size > _min_buffer_size:
 
1292
                result.append((cur_keys, cur_non_local))
 
1293
                sizes.append(cur_size)
 
1294
                cur_keys = []
 
1295
                cur_non_local = set()
 
1296
                cur_size = 0
 
1297
        if cur_keys:
 
1298
            result.append((cur_keys, cur_non_local))
 
1299
            sizes.append(cur_size)
 
1300
        trace.mutter('Collapsed %d keys into %d requests w/ %d file_ids'
 
1301
                     ' w/ sizes: %s', total_keys, len(result),
 
1302
                     len(prefix_split_keys), sizes)
 
1303
        return result
1162
1304
 
1163
1305
    def get_record_stream(self, keys, ordering, include_delta_closure):
1164
1306
        """Get a stream of records for keys.
1206
1348
        absent_keys = keys.difference(set(positions))
1207
1349
        # There may be more absent keys : if we're missing the basis component
1208
1350
        # and are trying to include the delta closure.
 
1351
        # XXX: We should not ever need to examine remote sources because we do
 
1352
        # not permit deltas across versioned files boundaries.
1209
1353
        if include_delta_closure:
1210
1354
            needed_from_fallback = set()
1211
1355
            # Build up reconstructable_keys dict.  key:True in this dict means
1283
1427
            # XXX: get_content_maps performs its own index queries; allow state
1284
1428
            # to be passed in.
1285
1429
            non_local_keys = needed_from_fallback - absent_keys
1286
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1287
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1288
 
            for prefix, keys in prefix_split_keys.iteritems():
1289
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1290
 
                non_local = set(non_local)
1291
 
                text_map, _ = self._get_content_maps(keys, non_local)
1292
 
                for key in keys:
1293
 
                    lines = text_map.pop(key)
1294
 
                    yield ChunkedContentFactory(key, global_map[key], None,
1295
 
                                                lines)
 
1430
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1431
                                                                non_local_keys,
 
1432
                                                                positions):
 
1433
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1434
                                                   global_map)
 
1435
                for record in generator.get_record_stream():
 
1436
                    yield record
1296
1437
        else:
1297
1438
            for source, keys in source_keys:
1298
1439
                if source is parent_maps[0]:
1330
1471
    def insert_record_stream(self, stream):
1331
1472
        """Insert a record stream into this container.
1332
1473
 
1333
 
        :param stream: A stream of records to insert. 
 
1474
        :param stream: A stream of records to insert.
1334
1475
        :return: None
1335
1476
        :seealso VersionedFiles.get_record_stream:
1336
1477
        """
1376
1517
        # key = basis_parent, value = index entry to add
1377
1518
        buffered_index_entries = {}
1378
1519
        for record in stream:
 
1520
            buffered = False
1379
1521
            parents = record.parents
1380
1522
            if record.storage_kind in delta_types:
1381
1523
                # TODO: eventually the record itself should track
1408
1550
                    except KeyError:
1409
1551
                        adapter_key = (record.storage_kind, "knit-ft-gz")
1410
1552
                        adapter = get_adapter(adapter_key)
1411
 
                    bytes = adapter.get_bytes(
1412
 
                        record, record.get_bytes_as(record.storage_kind))
 
1553
                    bytes = adapter.get_bytes(record)
1413
1554
                else:
1414
 
                    bytes = record.get_bytes_as(record.storage_kind)
 
1555
                    # It's a knit record, it has a _raw_record field (even if
 
1556
                    # it was reconstituted from a network stream).
 
1557
                    bytes = record._raw_record
1415
1558
                options = [record._build_details[0]]
1416
1559
                if record._build_details[1]:
1417
1560
                    options.append('no-eol')
1426
1569
                access_memo = self._access.add_raw_records(
1427
1570
                    [(record.key, len(bytes))], bytes)[0]
1428
1571
                index_entry = (record.key, options, access_memo, parents)
1429
 
                buffered = False
1430
1572
                if 'fulltext' not in options:
1431
1573
                    # Not a fulltext, so we need to make sure the compression
1432
1574
                    # parent will also be present.
1448
1590
            elif record.storage_kind == 'chunked':
1449
1591
                self.add_lines(record.key, parents,
1450
1592
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
 
            elif record.storage_kind == 'fulltext':
1452
 
                self.add_lines(record.key, parents,
1453
 
                    split_lines(record.get_bytes_as('fulltext')))
1454
1593
            else:
1455
 
                # Not a fulltext, and not suitable for direct insertion as a
 
1594
                # Not suitable for direct insertion as a
1456
1595
                # delta, either because it's not the right format, or this
1457
1596
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
1597
                # 0) or because it depends on a base only present in the
1459
1598
                # fallback kvfs.
1460
 
                adapter_key = record.storage_kind, 'fulltext'
1461
 
                adapter = get_adapter(adapter_key)
1462
 
                lines = split_lines(adapter.get_bytes(
1463
 
                    record, record.get_bytes_as(record.storage_kind)))
 
1599
                try:
 
1600
                    # Try getting a fulltext directly from the record.
 
1601
                    bytes = record.get_bytes_as('fulltext')
 
1602
                except errors.UnavailableRepresentation:
 
1603
                    adapter_key = record.storage_kind, 'fulltext'
 
1604
                    adapter = get_adapter(adapter_key)
 
1605
                    bytes = adapter.get_bytes(record)
 
1606
                lines = split_lines(bytes)
1464
1607
                try:
1465
1608
                    self.add_lines(record.key, parents, lines)
1466
1609
                except errors.RevisionAlreadyPresent:
1467
1610
                    pass
1468
1611
            # Add any records whose basis parent is now available.
1469
 
            added_keys = [record.key]
1470
 
            while added_keys:
1471
 
                key = added_keys.pop(0)
1472
 
                if key in buffered_index_entries:
1473
 
                    index_entries = buffered_index_entries[key]
1474
 
                    self._index.add_records(index_entries)
1475
 
                    added_keys.extend(
1476
 
                        [index_entry[0] for index_entry in index_entries])
1477
 
                    del buffered_index_entries[key]
1478
 
        # If there were any deltas which had a missing basis parent, error.
 
1612
            if not buffered:
 
1613
                added_keys = [record.key]
 
1614
                while added_keys:
 
1615
                    key = added_keys.pop(0)
 
1616
                    if key in buffered_index_entries:
 
1617
                        index_entries = buffered_index_entries[key]
 
1618
                        self._index.add_records(index_entries)
 
1619
                        added_keys.extend(
 
1620
                            [index_entry[0] for index_entry in index_entries])
 
1621
                        del buffered_index_entries[key]
1479
1622
        if buffered_index_entries:
1480
 
            from pprint import pformat
1481
 
            raise errors.BzrCheckError(
1482
 
                "record_stream refers to compression parents not in %r:\n%s"
1483
 
                % (self, pformat(sorted(buffered_index_entries.keys()))))
 
1623
            # There were index entries buffered at the end of the stream,
 
1624
            # So these need to be added (if the index supports holding such
 
1625
            # entries for later insertion)
 
1626
            for key in buffered_index_entries:
 
1627
                index_entries = buffered_index_entries[key]
 
1628
                self._index.add_records(index_entries,
 
1629
                    missing_compression_parents=True)
 
1630
 
 
1631
    def get_missing_compression_parent_keys(self):
 
1632
        """Return an iterable of keys of missing compression parents.
 
1633
 
 
1634
        Check this after calling insert_record_stream to find out if there are
 
1635
        any missing compression parents.  If there are, the records that
 
1636
        depend on them are not able to be inserted safely. For atomic
 
1637
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1638
        suspended - commit will fail at this point. Nonatomic knits will error
 
1639
        earlier because they have no staging area to put pending entries into.
 
1640
        """
 
1641
        return self._index.get_missing_compression_parents()
1484
1642
 
1485
1643
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1486
1644
        """Iterate over the lines in the versioned files from keys.
1528
1686
                        # fulltext
1529
1687
                        line_iterator = self._factory.get_fulltext_content(data)
1530
1688
                    else:
1531
 
                        # Delta 
 
1689
                        # Delta
1532
1690
                        line_iterator = self._factory.get_linedelta_content(data)
1533
1691
                    # Now that we are yielding the data for this key, remove it
1534
1692
                    # from the list
1545
1703
        # If there are still keys we've not yet found, we look in the fallback
1546
1704
        # vfs, and hope to find them there.  Note that if the keys are found
1547
1705
        # but had no changes or no content, the fallback may not return
1548
 
        # anything.  
 
1706
        # anything.
1549
1707
        if keys and not self._fallback_vfs:
1550
1708
            # XXX: strictly the second parameter is meant to be the file id
1551
1709
            # but it's not easily accessible here.
1573
1731
                           delta=None, annotated=None,
1574
1732
                           left_matching_blocks=None):
1575
1733
        """Merge annotations for content and generate deltas.
1576
 
        
 
1734
 
1577
1735
        This is done by comparing the annotations based on changes to the text
1578
1736
        and generating a delta on the resulting full texts. If annotations are
1579
1737
        not being created then a simple delta is created.
1661
1819
                                 rec[1], record_contents))
1662
1820
        if last_line != 'end %s\n' % rec[1]:
1663
1821
            raise KnitCorrupt(self,
1664
 
                              'unexpected version end line %r, wanted %r' 
 
1822
                              'unexpected version end line %r, wanted %r'
1665
1823
                              % (last_line, rec[1]))
1666
1824
        df.close()
1667
1825
        return rec, record_contents
1684
1842
        if not needed_records:
1685
1843
            return
1686
1844
 
1687
 
        # The transport optimizes the fetching as well 
 
1845
        # The transport optimizes the fetching as well
1688
1846
        # (ie, reads continuous ranges.)
1689
1847
        raw_data = self._access.get_raw_records(
1690
1848
            [index_memo for key, index_memo in needed_records])
1700
1858
        This unpacks enough of the text record to validate the id is
1701
1859
        as expected but thats all.
1702
1860
 
1703
 
        Each item the iterator yields is (key, bytes, sha1_of_full_text).
 
1861
        Each item the iterator yields is (key, bytes,
 
1862
            expected_sha1_of_full_text).
 
1863
        """
 
1864
        for key, data in self._read_records_iter_unchecked(records):
 
1865
            # validate the header (note that we can only use the suffix in
 
1866
            # current knit records).
 
1867
            df, rec = self._parse_record_header(key, data)
 
1868
            df.close()
 
1869
            yield key, data, rec[3]
 
1870
 
 
1871
    def _read_records_iter_unchecked(self, records):
 
1872
        """Read text records from data file and yield raw data.
 
1873
 
 
1874
        No validation is done.
 
1875
 
 
1876
        Yields tuples of (key, data).
1704
1877
        """
1705
1878
        # setup an iterator of the external records:
1706
1879
        # uses readv so nice and fast we hope.
1712
1885
 
1713
1886
        for key, index_memo in records:
1714
1887
            data = raw_records.next()
1715
 
            # validate the header (note that we can only use the suffix in
1716
 
            # current knit records).
1717
 
            df, rec = self._parse_record_header(key, data)
1718
 
            df.close()
1719
 
            yield key, data, rec[3]
 
1888
            yield key, data
1720
1889
 
1721
1890
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1722
1891
        """Convert key, digest, lines into a raw data block.
1723
 
        
 
1892
 
1724
1893
        :param key: The key of the record. Currently keys are always serialised
1725
1894
            using just the trailing component.
1726
1895
        :param dense_lines: The bytes of lines but in a denser form. For
1766
1935
        return result
1767
1936
 
1768
1937
 
 
1938
class _ContentMapGenerator(object):
 
1939
    """Generate texts or expose raw deltas for a set of texts."""
 
1940
 
 
1941
    def _get_content(self, key):
 
1942
        """Get the content object for key."""
 
1943
        # Note that _get_content is only called when the _ContentMapGenerator
 
1944
        # has been constructed with just one key requested for reconstruction.
 
1945
        if key in self.nonlocal_keys:
 
1946
            record = self.get_record_stream().next()
 
1947
            # Create a content object on the fly
 
1948
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1949
            return PlainKnitContent(lines, record.key)
 
1950
        else:
 
1951
            # local keys we can ask for directly
 
1952
            return self._get_one_work(key)
 
1953
 
 
1954
    def get_record_stream(self):
 
1955
        """Get a record stream for the keys requested during __init__."""
 
1956
        for record in self._work():
 
1957
            yield record
 
1958
 
 
1959
    def _work(self):
 
1960
        """Produce maps of text and KnitContents as dicts.
 
1961
 
 
1962
        :return: (text_map, content_map) where text_map contains the texts for
 
1963
            the requested versions and content_map contains the KnitContents.
 
1964
        """
 
1965
        # NB: By definition we never need to read remote sources unless texts
 
1966
        # are requested from them: we don't delta across stores - and we
 
1967
        # explicitly do not want to to prevent data loss situations.
 
1968
        if self.global_map is None:
 
1969
            self.global_map = self.vf.get_parent_map(self.keys)
 
1970
        nonlocal_keys = self.nonlocal_keys
 
1971
 
 
1972
        missing_keys = set(nonlocal_keys)
 
1973
        # Read from remote versioned file instances and provide to our caller.
 
1974
        for source in self.vf._fallback_vfs:
 
1975
            if not missing_keys:
 
1976
                break
 
1977
            # Loop over fallback repositories asking them for texts - ignore
 
1978
            # any missing from a particular fallback.
 
1979
            for record in source.get_record_stream(missing_keys,
 
1980
                'unordered', True):
 
1981
                if record.storage_kind == 'absent':
 
1982
                    # Not in thie particular stream, may be in one of the
 
1983
                    # other fallback vfs objects.
 
1984
                    continue
 
1985
                missing_keys.remove(record.key)
 
1986
                yield record
 
1987
 
 
1988
        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
 
1989
            allow_missing=True)
 
1990
        first = True
 
1991
        for key in self.keys:
 
1992
            if key in self.nonlocal_keys:
 
1993
                continue
 
1994
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
1995
            first = False
 
1996
 
 
1997
    def _get_one_work(self, requested_key):
 
1998
        # Now, if we have calculated everything already, just return the
 
1999
        # desired text.
 
2000
        if requested_key in self._contents_map:
 
2001
            return self._contents_map[requested_key]
 
2002
        # To simplify things, parse everything at once - code that wants one text
 
2003
        # probably wants them all.
 
2004
        # FUTURE: This function could be improved for the 'extract many' case
 
2005
        # by tracking each component and only doing the copy when the number of
 
2006
        # children than need to apply delta's to it is > 1 or it is part of the
 
2007
        # final output.
 
2008
        multiple_versions = len(self.keys) != 1
 
2009
        if self._record_map is None:
 
2010
            self._record_map = self.vf._raw_map_to_record_map(
 
2011
                self._raw_record_map)
 
2012
        record_map = self._record_map
 
2013
        # raw_record_map is key:
 
2014
        # Have read and parsed records at this point.
 
2015
        for key in self.keys:
 
2016
            if key in self.nonlocal_keys:
 
2017
                # already handled
 
2018
                continue
 
2019
            components = []
 
2020
            cursor = key
 
2021
            while cursor is not None:
 
2022
                try:
 
2023
                    record, record_details, digest, next = record_map[cursor]
 
2024
                except KeyError:
 
2025
                    raise RevisionNotPresent(cursor, self)
 
2026
                components.append((cursor, record, record_details, digest))
 
2027
                cursor = next
 
2028
                if cursor in self._contents_map:
 
2029
                    # no need to plan further back
 
2030
                    components.append((cursor, None, None, None))
 
2031
                    break
 
2032
 
 
2033
            content = None
 
2034
            for (component_id, record, record_details,
 
2035
                 digest) in reversed(components):
 
2036
                if component_id in self._contents_map:
 
2037
                    content = self._contents_map[component_id]
 
2038
                else:
 
2039
                    content, delta = self._factory.parse_record(key[-1],
 
2040
                        record, record_details, content,
 
2041
                        copy_base_content=multiple_versions)
 
2042
                    if multiple_versions:
 
2043
                        self._contents_map[component_id] = content
 
2044
 
 
2045
            # digest here is the digest from the last applied component.
 
2046
            text = content.text()
 
2047
            actual_sha = sha_strings(text)
 
2048
            if actual_sha != digest:
 
2049
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2050
        if multiple_versions:
 
2051
            return self._contents_map[requested_key]
 
2052
        else:
 
2053
            return content
 
2054
 
 
2055
    def _wire_bytes(self):
 
2056
        """Get the bytes to put on the wire for 'key'.
 
2057
 
 
2058
        The first collection of bytes asked for returns the serialised
 
2059
        raw_record_map and the additional details (key, parent) for key.
 
2060
        Subsequent calls return just the additional details (key, parent).
 
2061
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2062
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2063
 
 
2064
        :param key: A key from the content generator.
 
2065
        :return: Bytes to put on the wire.
 
2066
        """
 
2067
        lines = []
 
2068
        # kind marker for dispatch on the far side,
 
2069
        lines.append('knit-delta-closure')
 
2070
        # Annotated or not
 
2071
        if self.vf._factory.annotated:
 
2072
            lines.append('annotated')
 
2073
        else:
 
2074
            lines.append('')
 
2075
        # then the list of keys
 
2076
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
2077
            if key not in self.nonlocal_keys]))
 
2078
        # then the _raw_record_map in serialised form:
 
2079
        map_byte_list = []
 
2080
        # for each item in the map:
 
2081
        # 1 line with key
 
2082
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2083
        # one line with method
 
2084
        # one line with noeol
 
2085
        # one line with next ('' for None)
 
2086
        # one line with byte count of the record bytes
 
2087
        # the record bytes
 
2088
        for key, (record_bytes, (method, noeol), next) in \
 
2089
            self._raw_record_map.iteritems():
 
2090
            key_bytes = '\x00'.join(key)
 
2091
            parents = self.global_map.get(key, None)
 
2092
            if parents is None:
 
2093
                parent_bytes = 'None:'
 
2094
            else:
 
2095
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2096
            method_bytes = method
 
2097
            if noeol:
 
2098
                noeol_bytes = "T"
 
2099
            else:
 
2100
                noeol_bytes = "F"
 
2101
            if next:
 
2102
                next_bytes = '\x00'.join(next)
 
2103
            else:
 
2104
                next_bytes = ''
 
2105
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2106
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2107
                len(record_bytes), record_bytes))
 
2108
        map_bytes = ''.join(map_byte_list)
 
2109
        lines.append(map_bytes)
 
2110
        bytes = '\n'.join(lines)
 
2111
        return bytes
 
2112
 
 
2113
 
 
2114
class _VFContentMapGenerator(_ContentMapGenerator):
 
2115
    """Content map generator reading from a VersionedFiles object."""
 
2116
 
 
2117
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2118
        global_map=None, raw_record_map=None):
 
2119
        """Create a _ContentMapGenerator.
 
2120
 
 
2121
        :param versioned_files: The versioned files that the texts are being
 
2122
            extracted from.
 
2123
        :param keys: The keys to produce content maps for.
 
2124
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2125
            which are known to not be in this knit, but rather in one of the
 
2126
            fallback knits.
 
2127
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2128
            This is required if get_record_stream() is to be used.
 
2129
        :param raw_record_map: A unparsed raw record map to use for answering
 
2130
            contents.
 
2131
        """
 
2132
        # The vf to source data from
 
2133
        self.vf = versioned_files
 
2134
        # The keys desired
 
2135
        self.keys = list(keys)
 
2136
        # Keys known to be in fallback vfs objects
 
2137
        if nonlocal_keys is None:
 
2138
            self.nonlocal_keys = set()
 
2139
        else:
 
2140
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2141
        # Parents data for keys to be returned in get_record_stream
 
2142
        self.global_map = global_map
 
2143
        # The chunked lists for self.keys in text form
 
2144
        self._text_map = {}
 
2145
        # A cache of KnitContent objects used in extracting texts.
 
2146
        self._contents_map = {}
 
2147
        # All the knit records needed to assemble the requested keys as full
 
2148
        # texts.
 
2149
        self._record_map = None
 
2150
        if raw_record_map is None:
 
2151
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2152
                allow_missing=True)
 
2153
        else:
 
2154
            self._raw_record_map = raw_record_map
 
2155
        # the factory for parsing records
 
2156
        self._factory = self.vf._factory
 
2157
 
 
2158
 
 
2159
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2160
    """Content map generator sourced from a network stream."""
 
2161
 
 
2162
    def __init__(self, bytes, line_end):
 
2163
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2164
        self._bytes = bytes
 
2165
        self.global_map = {}
 
2166
        self._raw_record_map = {}
 
2167
        self._contents_map = {}
 
2168
        self._record_map = None
 
2169
        self.nonlocal_keys = []
 
2170
        # Get access to record parsing facilities
 
2171
        self.vf = KnitVersionedFiles(None, None)
 
2172
        start = line_end
 
2173
        # Annotated or not
 
2174
        line_end = bytes.find('\n', start)
 
2175
        line = bytes[start:line_end]
 
2176
        start = line_end + 1
 
2177
        if line == 'annotated':
 
2178
            self._factory = KnitAnnotateFactory()
 
2179
        else:
 
2180
            self._factory = KnitPlainFactory()
 
2181
        # list of keys to emit in get_record_stream
 
2182
        line_end = bytes.find('\n', start)
 
2183
        line = bytes[start:line_end]
 
2184
        start = line_end + 1
 
2185
        self.keys = [
 
2186
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2187
            if segment]
 
2188
        # now a loop until the end. XXX: It would be nice if this was just a
 
2189
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2190
        # there is a decent sized gap stopping that at the moment.
 
2191
        end = len(bytes)
 
2192
        while start < end:
 
2193
            # 1 line with key
 
2194
            line_end = bytes.find('\n', start)
 
2195
            key = tuple(bytes[start:line_end].split('\x00'))
 
2196
            start = line_end + 1
 
2197
            # 1 line with parents (None: for None, '' for ())
 
2198
            line_end = bytes.find('\n', start)
 
2199
            line = bytes[start:line_end]
 
2200
            if line == 'None:':
 
2201
                parents = None
 
2202
            else:
 
2203
                parents = tuple(
 
2204
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2205
                     if segment])
 
2206
            self.global_map[key] = parents
 
2207
            start = line_end + 1
 
2208
            # one line with method
 
2209
            line_end = bytes.find('\n', start)
 
2210
            line = bytes[start:line_end]
 
2211
            method = line
 
2212
            start = line_end + 1
 
2213
            # one line with noeol
 
2214
            line_end = bytes.find('\n', start)
 
2215
            line = bytes[start:line_end]
 
2216
            noeol = line == "T"
 
2217
            start = line_end + 1
 
2218
            # one line with next ('' for None)
 
2219
            line_end = bytes.find('\n', start)
 
2220
            line = bytes[start:line_end]
 
2221
            if not line:
 
2222
                next = None
 
2223
            else:
 
2224
                next = tuple(bytes[start:line_end].split('\x00'))
 
2225
            start = line_end + 1
 
2226
            # one line with byte count of the record bytes
 
2227
            line_end = bytes.find('\n', start)
 
2228
            line = bytes[start:line_end]
 
2229
            count = int(line)
 
2230
            start = line_end + 1
 
2231
            # the record bytes
 
2232
            record_bytes = bytes[start:start+count]
 
2233
            start = start + count
 
2234
            # put it in the map
 
2235
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2236
 
 
2237
    def get_record_stream(self):
 
2238
        """Get a record stream for for keys requested by the bytestream."""
 
2239
        first = True
 
2240
        for key in self.keys:
 
2241
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2242
            first = False
 
2243
 
 
2244
    def _wire_bytes(self):
 
2245
        return self._bytes
 
2246
 
 
2247
 
1769
2248
class _KndxIndex(object):
1770
2249
    """Manages knit index files
1771
2250
 
1785
2264
 
1786
2265
    Duplicate entries may be written to the index for a single version id
1787
2266
    if this is done then the latter one completely replaces the former:
1788
 
    this allows updates to correct version and parent information. 
 
2267
    this allows updates to correct version and parent information.
1789
2268
    Note that the two entries may share the delta, and that successive
1790
2269
    annotations and references MUST point to the first entry.
1791
2270
 
1792
2271
    The index file on disc contains a header, followed by one line per knit
1793
2272
    record. The same revision can be present in an index file more than once.
1794
 
    The first occurrence gets assigned a sequence number starting from 0. 
1795
 
    
 
2273
    The first occurrence gets assigned a sequence number starting from 0.
 
2274
 
1796
2275
    The format of a single line is
1797
2276
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1798
2277
    REVISION_ID is a utf8-encoded revision id
1799
 
    FLAGS is a comma separated list of flags about the record. Values include 
 
2278
    FLAGS is a comma separated list of flags about the record. Values include
1800
2279
        no-eol, line-delta, fulltext.
1801
2280
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
1802
2281
        that the the compressed data starts at.
1806
2285
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1807
2286
        revision id already in the knit that is a parent of REVISION_ID.
1808
2287
    The ' :' marker is the end of record marker.
1809
 
    
 
2288
 
1810
2289
    partial writes:
1811
2290
    when a write is interrupted to the index file, it will result in a line
1812
2291
    that does not end in ' :'. If the ' :' is not present at the end of a line,
1837
2316
        self._reset_cache()
1838
2317
        self.has_graph = True
1839
2318
 
1840
 
    def add_records(self, records, random_id=False):
 
2319
    def add_records(self, records, random_id=False, missing_compression_parents=False):
1841
2320
        """Add multiple records to the index.
1842
 
        
 
2321
 
1843
2322
        :param records: a list of tuples:
1844
2323
                         (key, options, access_memo, parents).
1845
2324
        :param random_id: If True the ids being added were randomly generated
1846
2325
            and no check for existence will be performed.
 
2326
        :param missing_compression_parents: If True the records being added are
 
2327
            only compressed against texts already in the index (or inside
 
2328
            records). If False the records all refer to unavailable texts (or
 
2329
            texts inside records) as compression parents.
1847
2330
        """
 
2331
        if missing_compression_parents:
 
2332
            # It might be nice to get the edge of the records. But keys isn't
 
2333
            # _wrong_.
 
2334
            keys = sorted(record[0] for record in records)
 
2335
            raise errors.RevisionNotPresent(keys, self)
1848
2336
        paths = {}
1849
2337
        for record in records:
1850
2338
            key = record[0]
1881
2369
                self._kndx_cache[prefix] = (orig_cache, orig_history)
1882
2370
                raise
1883
2371
 
 
2372
    def scan_unvalidated_index(self, graph_index):
 
2373
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2374
        # Because kndx files do not support atomic insertion via separate index
 
2375
        # files, they do not support this method.
 
2376
        raise NotImplementedError(self.scan_unvalidated_index)
 
2377
 
 
2378
    def get_missing_compression_parents(self):
 
2379
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2380
        # Because kndx files do not support atomic insertion via separate index
 
2381
        # files, they do not support this method.
 
2382
        raise NotImplementedError(self.get_missing_compression_parents)
 
2383
 
1884
2384
    def _cache_key(self, key, options, pos, size, parent_keys):
1885
2385
        """Cache a version record in the history array and index cache.
1886
2386
 
2019
2519
 
2020
2520
    def get_position(self, key):
2021
2521
        """Return details needed to access the version.
2022
 
        
 
2522
 
2023
2523
        :return: a tuple (key, data position, size) to hand to the access
2024
2524
            logic to get the record.
2025
2525
        """
2029
2529
        return key, entry[2], entry[3]
2030
2530
 
2031
2531
    has_key = _mod_index._has_key_from_parent_map
2032
 
    
 
2532
 
2033
2533
    def _init_index(self, path, extra_lines=[]):
2034
2534
        """Initialize an index."""
2035
2535
        sio = StringIO()
2044
2544
 
2045
2545
    def keys(self):
2046
2546
        """Get all the keys in the collection.
2047
 
        
 
2547
 
2048
2548
        The keys are not ordered.
2049
2549
        """
2050
2550
        result = set()
2063
2563
            for suffix in self._kndx_cache[prefix][1]:
2064
2564
                result.add(prefix + (suffix,))
2065
2565
        return result
2066
 
    
 
2566
 
2067
2567
    def _load_prefixes(self, prefixes):
2068
2568
        """Load the indices for prefixes."""
2069
2569
        self._check_read()
2107
2607
 
2108
2608
    def _dictionary_compress(self, keys):
2109
2609
        """Dictionary compress keys.
2110
 
        
 
2610
 
2111
2611
        :param keys: The keys to generate references to.
2112
2612
        :return: A string representation of keys. keys which are present are
2113
2613
            dictionary compressed, and others are emitted as fulltext with a
2161
2661
            return index_memo[0][:-1], index_memo[1]
2162
2662
        return keys.sort(key=get_sort_key)
2163
2663
 
 
2664
    _get_total_build_size = _get_total_build_size
 
2665
 
2164
2666
    def _split_key(self, key):
2165
2667
        """Split key into a prefix and suffix."""
2166
2668
        return key[:-1], key[-1]
2177
2679
        :param is_locked: A callback to check whether the object should answer
2178
2680
            queries.
2179
2681
        :param deltas: Allow delta-compressed records.
2180
 
        :param parents: If True, record knits parents, if not do not record 
 
2682
        :param parents: If True, record knits parents, if not do not record
2181
2683
            parents.
2182
2684
        :param add_callback: If not None, allow additions to the index and call
2183
2685
            this callback with a list of added GraphIndex nodes:
2196
2698
                "parent tracking.")
2197
2699
        self.has_graph = parents
2198
2700
        self._is_locked = is_locked
 
2701
        self._missing_compression_parents = set()
2199
2702
 
2200
2703
    def __repr__(self):
2201
2704
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2202
2705
 
2203
 
    def add_records(self, records, random_id=False):
 
2706
    def add_records(self, records, random_id=False,
 
2707
        missing_compression_parents=False):
2204
2708
        """Add multiple records to the index.
2205
 
        
 
2709
 
2206
2710
        This function does not insert data into the Immutable GraphIndex
2207
2711
        backing the KnitGraphIndex, instead it prepares data for insertion by
2208
2712
        the caller and checks that it is safe to insert then calls
2212
2716
                         (key, options, access_memo, parents).
2213
2717
        :param random_id: If True the ids being added were randomly generated
2214
2718
            and no check for existence will be performed.
 
2719
        :param missing_compression_parents: If True the records being added are
 
2720
            only compressed against texts already in the index (or inside
 
2721
            records). If False the records all refer to unavailable texts (or
 
2722
            texts inside records) as compression parents.
2215
2723
        """
2216
2724
        if not self._add_callback:
2217
2725
            raise errors.ReadOnlyError(self)
2219
2727
        # anymore.
2220
2728
 
2221
2729
        keys = {}
 
2730
        compression_parents = set()
2222
2731
        for (key, options, access_memo, parents) in records:
2223
2732
            if self._parents:
2224
2733
                parents = tuple(parents)
2235
2744
                if self._deltas:
2236
2745
                    if 'line-delta' in options:
2237
2746
                        node_refs = (parents, (parents[0],))
 
2747
                        if missing_compression_parents:
 
2748
                            compression_parents.add(parents[0])
2238
2749
                    else:
2239
2750
                        node_refs = (parents, ())
2240
2751
                else:
2262
2773
            for key, (value, node_refs) in keys.iteritems():
2263
2774
                result.append((key, value))
2264
2775
        self._add_callback(result)
2265
 
        
 
2776
        if missing_compression_parents:
 
2777
            # This may appear to be incorrect (it does not check for
 
2778
            # compression parents that are in the existing graph index),
 
2779
            # but such records won't have been buffered, so this is
 
2780
            # actually correct: every entry when
 
2781
            # missing_compression_parents==True either has a missing parent, or
 
2782
            # a parent that is one of the keys in records.
 
2783
            compression_parents.difference_update(keys)
 
2784
            self._missing_compression_parents.update(compression_parents)
 
2785
        # Adding records may have satisfied missing compression parents.
 
2786
        self._missing_compression_parents.difference_update(keys)
 
2787
 
 
2788
    def scan_unvalidated_index(self, graph_index):
 
2789
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2790
 
 
2791
        This allows this _KnitGraphIndex to keep track of any missing
 
2792
        compression parents we may want to have filled in to make those
 
2793
        indices valid.
 
2794
 
 
2795
        :param graph_index: A GraphIndex
 
2796
        """
 
2797
        if self._deltas:
 
2798
            new_missing = graph_index.external_references(ref_list_num=1)
 
2799
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2800
            self._missing_compression_parents.update(new_missing)
 
2801
 
 
2802
    def get_missing_compression_parents(self):
 
2803
        """Return the keys of missing compression parents.
 
2804
 
 
2805
        Missing compression parents occur when a record stream was missing
 
2806
        basis texts, or a index was scanned that had missing basis texts.
 
2807
        """
 
2808
        return frozenset(self._missing_compression_parents)
 
2809
 
2266
2810
    def _check_read(self):
2267
2811
        """raise if reads are not permitted."""
2268
2812
        if not self._is_locked():
2328
2872
 
2329
2873
    def _get_entries(self, keys, check_present=False):
2330
2874
        """Get the entries for keys.
2331
 
        
 
2875
 
2332
2876
        :param keys: An iterable of index key tuples.
2333
2877
        """
2334
2878
        keys = set(keys)
2396
2940
 
2397
2941
    def get_position(self, key):
2398
2942
        """Return details needed to access the version.
2399
 
        
 
2943
 
2400
2944
        :return: a tuple (index, data position, size) to hand to the access
2401
2945
            logic to get the record.
2402
2946
        """
2407
2951
 
2408
2952
    def keys(self):
2409
2953
        """Get all the keys in the collection.
2410
 
        
 
2954
 
2411
2955
        The keys are not ordered.
2412
2956
        """
2413
2957
        self._check_read()
2414
2958
        return [node[1] for node in self._graph_index.iter_all_entries()]
2415
 
    
 
2959
 
2416
2960
    missing_keys = _mod_index._missing_keys_from_parent_map
2417
2961
 
2418
2962
    def _node_to_position(self, node):
2440
2984
            return positions[key][1]
2441
2985
        return keys.sort(key=get_index_memo)
2442
2986
 
 
2987
    _get_total_build_size = _get_total_build_size
 
2988
 
2443
2989
 
2444
2990
class _KnitKeyAccess(object):
2445
2991
    """Access to records in .knit files."""
2562
3108
    def get_raw_records(self, memos_for_retrieval):
2563
3109
        """Get the raw bytes for a records.
2564
3110
 
2565
 
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
3111
        :param memos_for_retrieval: An iterable containing the (index, pos,
2566
3112
            length) memo for retrieving the bytes. The Pack access method
2567
3113
            looks up the pack to use for a given record in its index_to_pack
2568
3114
            map.