/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

merge bzr.dev r4029

Show diffs side-by-side

added added

removed removed

Lines of Context:
151
151
class FTAnnotatedToUnannotated(KnitAdapter):
152
152
    """An adapter from FT annotated knits to unannotated ones."""
153
153
 
154
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
154
    def get_bytes(self, factory):
 
155
        annotated_compressed_bytes = factory._raw_record
155
156
        rec, contents = \
156
157
            self._data._parse_record_unchecked(annotated_compressed_bytes)
157
158
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
162
163
class DeltaAnnotatedToUnannotated(KnitAdapter):
163
164
    """An adapter for deltas from annotated to unannotated."""
164
165
 
165
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
166
    def get_bytes(self, factory):
 
167
        annotated_compressed_bytes = factory._raw_record
166
168
        rec, contents = \
167
169
            self._data._parse_record_unchecked(annotated_compressed_bytes)
168
170
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
175
177
class FTAnnotatedToFullText(KnitAdapter):
176
178
    """An adapter from FT annotated knits to unannotated ones."""
177
179
 
178
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
180
    def get_bytes(self, factory):
 
181
        annotated_compressed_bytes = factory._raw_record
179
182
        rec, contents = \
180
183
            self._data._parse_record_unchecked(annotated_compressed_bytes)
181
184
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
186
189
class DeltaAnnotatedToFullText(KnitAdapter):
187
190
    """An adapter for deltas from annotated to unannotated."""
188
191
 
189
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
192
    def get_bytes(self, factory):
 
193
        annotated_compressed_bytes = factory._raw_record
190
194
        rec, contents = \
191
195
            self._data._parse_record_unchecked(annotated_compressed_bytes)
192
196
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
209
213
class FTPlainToFullText(KnitAdapter):
210
214
    """An adapter from FT plain knits to unannotated ones."""
211
215
 
212
 
    def get_bytes(self, factory, compressed_bytes):
 
216
    def get_bytes(self, factory):
 
217
        compressed_bytes = factory._raw_record
213
218
        rec, contents = \
214
219
            self._data._parse_record_unchecked(compressed_bytes)
215
220
        content, delta = self._plain_factory.parse_record(factory.key[-1],
220
225
class DeltaPlainToFullText(KnitAdapter):
221
226
    """An adapter for deltas from annotated to unannotated."""
222
227
 
223
 
    def get_bytes(self, factory, compressed_bytes):
 
228
    def get_bytes(self, factory):
 
229
        compressed_bytes = factory._raw_record
224
230
        rec, contents = \
225
231
            self._data._parse_record_unchecked(compressed_bytes)
226
232
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
247
253
    """
248
254
 
249
255
    def __init__(self, key, parents, build_details, sha1, raw_record,
250
 
        annotated, knit=None):
 
256
        annotated, knit=None, network_bytes=None):
251
257
        """Create a KnitContentFactory for key.
252
258
        
253
259
        :param key: The key.
257
263
        :param sha1: The sha1 expected from the full text of this object.
258
264
        :param raw_record: The bytes of the knit data from disk.
259
265
        :param annotated: True if the raw data is annotated.
 
266
        :param network_bytes: None to calculate the network bytes on demand,
 
267
            not-none if they are already known.
260
268
        """
261
269
        ContentFactory.__init__(self)
262
270
        self.sha1 = sha1
272
280
            annotated_kind = ''
273
281
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
274
282
        self._raw_record = raw_record
 
283
        self._network_bytes = network_bytes
275
284
        self._build_details = build_details
276
285
        self._knit = knit
277
286
 
 
287
    def _create_network_bytes(self):
 
288
        """Create a fully serialised network version for transmission."""
 
289
        # storage_kind, key, parents, Noeol, raw_record
 
290
        key_bytes = '\x00'.join(self.key)
 
291
        if self.parents is None:
 
292
            parent_bytes = 'None:'
 
293
        else:
 
294
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
295
        if self._build_details[1]:
 
296
            noeol = 'N'
 
297
        else:
 
298
            noeol = ' '
 
299
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
300
            parent_bytes, noeol, self._raw_record)
 
301
        self._network_bytes = network_bytes
 
302
 
278
303
    def get_bytes_as(self, storage_kind):
279
304
        if storage_kind == self.storage_kind:
280
 
            return self._raw_record
 
305
            if self._network_bytes is None:
 
306
                self._create_network_bytes()
 
307
            return self._network_bytes
281
308
        if self._knit is not None:
282
309
            if storage_kind == 'chunked':
283
310
                return self._knit.get_lines(self.key[0])
287
314
            self.storage_kind)
288
315
 
289
316
 
 
317
class LazyKnitContentFactory(ContentFactory):
 
318
    """A ContentFactory which can either generate full text or a wire form.
 
319
 
 
320
    :seealso ContentFactory:
 
321
    """
 
322
 
 
323
    def __init__(self, key, parents, generator, first):
 
324
        """Create a LazyKnitContentFactory.
 
325
        
 
326
        :param key: The key of the record.
 
327
        :param parents: The parents of the record.
 
328
        :param generator: A _ContentMapGenerator containing the record for this
 
329
            key.
 
330
        :param first: Is this the first content object returned from generator?
 
331
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
332
            knit-delta-closure-ref
 
333
        """
 
334
        self.key = key
 
335
        self.parents = parents
 
336
        self.sha1 = None
 
337
        self._generator = generator
 
338
        self.storage_kind = "knit-delta-closure"
 
339
        if not first:
 
340
            self.storage_kind = self.storage_kind + "-ref"
 
341
        self._first = first
 
342
 
 
343
    def get_bytes_as(self, storage_kind):
 
344
        if storage_kind == self.storage_kind:
 
345
            if self._first:
 
346
                return self._generator._wire_bytes()
 
347
            else:
 
348
                # all the keys etc are contained in the bytes returned in the
 
349
                # first record.
 
350
                return ''
 
351
        if storage_kind in ('chunked', 'fulltext'):
 
352
            chunks = self._generator._get_one_work(self.key).text()
 
353
            if storage_kind == 'chunked':
 
354
                return chunks
 
355
            else:
 
356
                return ''.join(chunks)
 
357
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
358
            self.storage_kind)
 
359
 
 
360
 
 
361
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
362
    """Convert a network record to a iterator over stream records.
 
363
 
 
364
    :param storage_kind: The storage kind of the record.
 
365
        Must be 'knit-delta-closure'.
 
366
    :param bytes: The bytes of the record on the network.
 
367
    """
 
368
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
369
    return generator.get_record_stream()
 
370
 
 
371
 
 
372
def knit_network_to_record(storage_kind, bytes, line_end):
 
373
    """Convert a network record to a record object.
 
374
 
 
375
    :param storage_kind: The storage kind of the record.
 
376
    :param bytes: The bytes of the record on the network.
 
377
    """
 
378
    start = line_end
 
379
    line_end = bytes.find('\n', start)
 
380
    key = tuple(bytes[start:line_end].split('\x00'))
 
381
    start = line_end + 1
 
382
    line_end = bytes.find('\n', start)
 
383
    parent_line = bytes[start:line_end]
 
384
    if parent_line == 'None:':
 
385
        parents = None
 
386
    else:
 
387
        parents = tuple(
 
388
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
389
             if segment])
 
390
    start = line_end + 1
 
391
    noeol = bytes[start] == 'N'
 
392
    if 'ft' in storage_kind:
 
393
        method = 'fulltext'
 
394
    else:
 
395
        method = 'line-delta'
 
396
    build_details = (method, noeol)
 
397
    start = start + 1
 
398
    raw_record = bytes[start:]
 
399
    annotated = 'annotated' in storage_kind
 
400
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
401
        annotated, network_bytes=bytes)]
 
402
 
 
403
 
290
404
class KnitContent(object):
291
405
    """Content of a knit version to which deltas can be applied.
292
406
    
986
1100
            if not self.get_parent_map([key]):
987
1101
                raise RevisionNotPresent(key, self)
988
1102
            return cached_version
989
 
        text_map, contents_map = self._get_content_maps([key])
990
 
        return contents_map[key]
991
 
 
992
 
    def _get_content_maps(self, keys, nonlocal_keys=None):
993
 
        """Produce maps of text and KnitContents
994
 
        
995
 
        :param keys: The keys to produce content maps for.
996
 
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
997
 
            which are known to not be in this knit, but rather in one of the
998
 
            fallback knits.
999
 
        :return: (text_map, content_map) where text_map contains the texts for
1000
 
            the requested versions and content_map contains the KnitContents.
1001
 
        """
1002
 
        # FUTURE: This function could be improved for the 'extract many' case
1003
 
        # by tracking each component and only doing the copy when the number of
1004
 
        # children than need to apply delta's to it is > 1 or it is part of the
1005
 
        # final output.
1006
 
        keys = list(keys)
1007
 
        multiple_versions = len(keys) != 1
1008
 
        record_map = self._get_record_map(keys, allow_missing=True)
1009
 
 
1010
 
        text_map = {}
1011
 
        content_map = {}
1012
 
        final_content = {}
1013
 
        if nonlocal_keys is None:
1014
 
            nonlocal_keys = set()
1015
 
        else:
1016
 
            nonlocal_keys = frozenset(nonlocal_keys)
1017
 
        missing_keys = set(nonlocal_keys)
1018
 
        for source in self._fallback_vfs:
1019
 
            if not missing_keys:
1020
 
                break
1021
 
            for record in source.get_record_stream(missing_keys,
1022
 
                'unordered', True):
1023
 
                if record.storage_kind == 'absent':
1024
 
                    continue
1025
 
                missing_keys.remove(record.key)
1026
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
 
                text_map[record.key] = lines
1028
 
                content_map[record.key] = PlainKnitContent(lines, record.key)
1029
 
                if record.key in keys:
1030
 
                    final_content[record.key] = content_map[record.key]
1031
 
        for key in keys:
1032
 
            if key in nonlocal_keys:
1033
 
                # already handled
1034
 
                continue
1035
 
            components = []
1036
 
            cursor = key
1037
 
            while cursor is not None:
1038
 
                try:
1039
 
                    record, record_details, digest, next = record_map[cursor]
1040
 
                except KeyError:
1041
 
                    raise RevisionNotPresent(cursor, self)
1042
 
                components.append((cursor, record, record_details, digest))
1043
 
                cursor = next
1044
 
                if cursor in content_map:
1045
 
                    # no need to plan further back
1046
 
                    components.append((cursor, None, None, None))
1047
 
                    break
1048
 
 
1049
 
            content = None
1050
 
            for (component_id, record, record_details,
1051
 
                 digest) in reversed(components):
1052
 
                if component_id in content_map:
1053
 
                    content = content_map[component_id]
1054
 
                else:
1055
 
                    content, delta = self._factory.parse_record(key[-1],
1056
 
                        record, record_details, content,
1057
 
                        copy_base_content=multiple_versions)
1058
 
                    if multiple_versions:
1059
 
                        content_map[component_id] = content
1060
 
 
1061
 
            final_content[key] = content
1062
 
 
1063
 
            # digest here is the digest from the last applied component.
1064
 
            text = content.text()
1065
 
            actual_sha = sha_strings(text)
1066
 
            if actual_sha != digest:
1067
 
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1068
 
            text_map[key] = text
1069
 
        return text_map, final_content
 
1103
        generator = _VFContentMapGenerator(self, [key])
 
1104
        return generator._get_content(key)
1070
1105
 
1071
1106
    def get_parent_map(self, keys):
1072
1107
        """Get a map of the graph parents of keys.
1105
1140
        
1106
1141
        :return: {key:(record, record_details, digest, next)}
1107
1142
            record
1108
 
                data returned from read_records
 
1143
                data returned from read_records (a KnitContentobject)
1109
1144
            record_details
1110
1145
                opaque information to pass to parse_record
1111
1146
            digest
1117
1152
        :param allow_missing: If some records are missing, rather than 
1118
1153
            error, just return the data that could be generated.
1119
1154
        """
 
1155
        raw_map = self._get_record_map_unparsed(keys,
 
1156
            allow_missing=allow_missing)
 
1157
        return self._raw_map_to_record_map(raw_map)
 
1158
 
 
1159
    def _raw_map_to_record_map(self, raw_map):
 
1160
        """Parse the contents of _get_record_map_unparsed.
 
1161
        
 
1162
        :return: see _get_record_map.
 
1163
        """
 
1164
        result = {}
 
1165
        for key in raw_map:
 
1166
            data, record_details, next = raw_map[key]
 
1167
            content, digest = self._parse_record(key[-1], data)
 
1168
            result[key] = content, record_details, digest, next
 
1169
        return result
 
1170
 
 
1171
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1172
        """Get the raw data for reconstructing keys without parsing it.
 
1173
        
 
1174
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1175
            key-> raw_bytes, (method, noeol), compression_parent
 
1176
        """
1120
1177
        # This retries the whole request if anything fails. Potentially we
1121
1178
        # could be a bit more selective. We could track the keys whose records
1122
1179
        # we have successfully found, and then only request the new records
1132
1189
                # n = next
1133
1190
                records = [(key, i_m) for key, (r, i_m, n)
1134
1191
                                       in position_map.iteritems()]
1135
 
                record_map = {}
1136
 
                for key, record, digest in self._read_records_iter(records):
 
1192
                raw_record_map = {}
 
1193
                for key, data in self._read_records_iter_unchecked(records):
1137
1194
                    (record_details, index_memo, next) = position_map[key]
1138
 
                    record_map[key] = record, record_details, digest, next
1139
 
                return record_map
 
1195
                    raw_record_map[key] = data, record_details, next
 
1196
                return raw_record_map
1140
1197
            except errors.RetryWithNewPacks, e:
1141
1198
                self._access.reload_or_raise(e)
1142
1199
 
1206
1263
        absent_keys = keys.difference(set(positions))
1207
1264
        # There may be more absent keys : if we're missing the basis component
1208
1265
        # and are trying to include the delta closure.
 
1266
        # XXX: We should not ever need to examine remote sources because we do
 
1267
        # not permit deltas across versioned files boundaries.
1209
1268
        if include_delta_closure:
1210
1269
            needed_from_fallback = set()
1211
1270
            # Build up reconstructable_keys dict.  key:True in this dict means
1288
1347
            for prefix, keys in prefix_split_keys.iteritems():
1289
1348
                non_local = prefix_split_non_local_keys.get(prefix, [])
1290
1349
                non_local = set(non_local)
1291
 
                text_map, _ = self._get_content_maps(keys, non_local)
1292
 
                for key in keys:
1293
 
                    lines = text_map.pop(key)
1294
 
                    yield ChunkedContentFactory(key, global_map[key], None,
1295
 
                                                lines)
 
1350
                generator = _VFContentMapGenerator(self, keys, non_local,
 
1351
                    global_map)
 
1352
                for record in generator.get_record_stream():
 
1353
                    yield record
1296
1354
        else:
1297
1355
            for source, keys in source_keys:
1298
1356
                if source is parent_maps[0]:
1408
1466
                    except KeyError:
1409
1467
                        adapter_key = (record.storage_kind, "knit-ft-gz")
1410
1468
                        adapter = get_adapter(adapter_key)
1411
 
                    bytes = adapter.get_bytes(
1412
 
                        record, record.get_bytes_as(record.storage_kind))
 
1469
                    bytes = adapter.get_bytes(record)
1413
1470
                else:
1414
 
                    bytes = record.get_bytes_as(record.storage_kind)
 
1471
                    # It's a knit record, it has a _raw_record field (even if
 
1472
                    # it was reconstituted from a network stream).
 
1473
                    bytes = record._raw_record
1415
1474
                options = [record._build_details[0]]
1416
1475
                if record._build_details[1]:
1417
1476
                    options.append('no-eol')
1448
1507
            elif record.storage_kind == 'chunked':
1449
1508
                self.add_lines(record.key, parents,
1450
1509
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
 
            elif record.storage_kind == 'fulltext':
1452
 
                self.add_lines(record.key, parents,
1453
 
                    split_lines(record.get_bytes_as('fulltext')))
1454
1510
            else:
1455
 
                # Not a fulltext, and not suitable for direct insertion as a
 
1511
                # Not suitable for direct insertion as a
1456
1512
                # delta, either because it's not the right format, or this
1457
1513
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
1514
                # 0) or because it depends on a base only present in the
1459
1515
                # fallback kvfs.
1460
 
                adapter_key = record.storage_kind, 'fulltext'
1461
 
                adapter = get_adapter(adapter_key)
1462
 
                lines = split_lines(adapter.get_bytes(
1463
 
                    record, record.get_bytes_as(record.storage_kind)))
 
1516
                try:
 
1517
                    # Try getting a fulltext directly from the record.
 
1518
                    bytes = record.get_bytes_as('fulltext')
 
1519
                except errors.UnavailableRepresentation:
 
1520
                    adapter_key = record.storage_kind, 'fulltext'
 
1521
                    adapter = get_adapter(adapter_key)
 
1522
                    bytes = adapter.get_bytes(record)
 
1523
                lines = split_lines(bytes)
1464
1524
                try:
1465
1525
                    self.add_lines(record.key, parents, lines)
1466
1526
                except errors.RevisionAlreadyPresent:
1475
1535
                    added_keys.extend(
1476
1536
                        [index_entry[0] for index_entry in index_entries])
1477
1537
                    del buffered_index_entries[key]
1478
 
        # If there were any deltas which had a missing basis parent, error.
1479
1538
        if buffered_index_entries:
1480
 
            from pprint import pformat
1481
 
            raise errors.BzrCheckError(
1482
 
                "record_stream refers to compression parents not in %r:\n%s"
1483
 
                % (self, pformat(sorted(buffered_index_entries.keys()))))
 
1539
            # There were index entries buffered at the end of the stream,
 
1540
            # So these need to be added (if the index supports holding such
 
1541
            # entries for later insertion)
 
1542
            for key in buffered_index_entries:
 
1543
                index_entries = buffered_index_entries[key]
 
1544
                self._index.add_records(index_entries,
 
1545
                    missing_compression_parents=True)
 
1546
 
 
1547
    def get_missing_compression_parent_keys(self):
 
1548
        """Return an iterable of keys of missing compression parents.
 
1549
 
 
1550
        Check this after calling insert_record_stream to find out if there are
 
1551
        any missing compression parents.  If there are, the records that
 
1552
        depend on them are not able to be inserted safely. For atomic
 
1553
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1554
        suspended - commit will fail at this point. Nonatomic knits will error
 
1555
        earlier because they have no staging area to put pending entries into.
 
1556
        """
 
1557
        return self._index.get_missing_compression_parents()
1484
1558
 
1485
1559
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1486
1560
        """Iterate over the lines in the versioned files from keys.
1700
1774
        This unpacks enough of the text record to validate the id is
1701
1775
        as expected but thats all.
1702
1776
 
1703
 
        Each item the iterator yields is (key, bytes, sha1_of_full_text).
 
1777
        Each item the iterator yields is (key, bytes,
 
1778
            expected_sha1_of_full_text).
 
1779
        """
 
1780
        for key, data in self._read_records_iter_unchecked(records):
 
1781
            # validate the header (note that we can only use the suffix in
 
1782
            # current knit records).
 
1783
            df, rec = self._parse_record_header(key, data)
 
1784
            df.close()
 
1785
            yield key, data, rec[3]
 
1786
 
 
1787
    def _read_records_iter_unchecked(self, records):
 
1788
        """Read text records from data file and yield raw data.
 
1789
 
 
1790
        No validation is done.
 
1791
 
 
1792
        Yields tuples of (key, data).
1704
1793
        """
1705
1794
        # setup an iterator of the external records:
1706
1795
        # uses readv so nice and fast we hope.
1712
1801
 
1713
1802
        for key, index_memo in records:
1714
1803
            data = raw_records.next()
1715
 
            # validate the header (note that we can only use the suffix in
1716
 
            # current knit records).
1717
 
            df, rec = self._parse_record_header(key, data)
1718
 
            df.close()
1719
 
            yield key, data, rec[3]
 
1804
            yield key, data
1720
1805
 
1721
1806
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1722
1807
        """Convert key, digest, lines into a raw data block.
1766
1851
        return result
1767
1852
 
1768
1853
 
 
1854
class _ContentMapGenerator(object):
 
1855
    """Generate texts or expose raw deltas for a set of texts."""
 
1856
 
 
1857
    def _get_content(self, key):
 
1858
        """Get the content object for key."""
 
1859
        # Note that _get_content is only called when the _ContentMapGenerator
 
1860
        # has been constructed with just one key requested for reconstruction.
 
1861
        if key in self.nonlocal_keys:
 
1862
            record = self.get_record_stream().next()
 
1863
            # Create a content object on the fly
 
1864
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1865
            return PlainKnitContent(lines, record.key)
 
1866
        else:
 
1867
            # local keys we can ask for directly
 
1868
            return self._get_one_work(key)
 
1869
 
 
1870
    def get_record_stream(self):
 
1871
        """Get a record stream for the keys requested during __init__."""
 
1872
        for record in self._work():
 
1873
            yield record
 
1874
 
 
1875
    def _work(self):
 
1876
        """Produce maps of text and KnitContents as dicts.
 
1877
        
 
1878
        :return: (text_map, content_map) where text_map contains the texts for
 
1879
            the requested versions and content_map contains the KnitContents.
 
1880
        """
 
1881
        # NB: By definition we never need to read remote sources unless texts
 
1882
        # are requested from them: we don't delta across stores - and we
 
1883
        # explicitly do not want to to prevent data loss situations.
 
1884
        if self.global_map is None:
 
1885
            self.global_map = self.vf.get_parent_map(self.keys)
 
1886
        nonlocal_keys = self.nonlocal_keys
 
1887
 
 
1888
        missing_keys = set(nonlocal_keys)
 
1889
        # Read from remote versioned file instances and provide to our caller.
 
1890
        for source in self.vf._fallback_vfs:
 
1891
            if not missing_keys:
 
1892
                break
 
1893
            # Loop over fallback repositories asking them for texts - ignore
 
1894
            # any missing from a particular fallback.
 
1895
            for record in source.get_record_stream(missing_keys,
 
1896
                'unordered', True):
 
1897
                if record.storage_kind == 'absent':
 
1898
                    # Not in thie particular stream, may be in one of the
 
1899
                    # other fallback vfs objects.
 
1900
                    continue
 
1901
                missing_keys.remove(record.key)
 
1902
                yield record
 
1903
 
 
1904
        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
 
1905
            allow_missing=True)
 
1906
        first = True
 
1907
        for key in self.keys:
 
1908
            if key in self.nonlocal_keys:
 
1909
                continue
 
1910
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
1911
            first = False
 
1912
 
 
1913
    def _get_one_work(self, requested_key):
 
1914
        # Now, if we have calculated everything already, just return the
 
1915
        # desired text.
 
1916
        if requested_key in self._contents_map:
 
1917
            return self._contents_map[requested_key]
 
1918
        # To simplify things, parse everything at once - code that wants one text
 
1919
        # probably wants them all.
 
1920
        # FUTURE: This function could be improved for the 'extract many' case
 
1921
        # by tracking each component and only doing the copy when the number of
 
1922
        # children than need to apply delta's to it is > 1 or it is part of the
 
1923
        # final output.
 
1924
        multiple_versions = len(self.keys) != 1
 
1925
        if self._record_map is None:
 
1926
            self._record_map = self.vf._raw_map_to_record_map(
 
1927
                self._raw_record_map)
 
1928
        record_map = self._record_map
 
1929
        # raw_record_map is key:
 
1930
        # Have read and parsed records at this point. 
 
1931
        for key in self.keys:
 
1932
            if key in self.nonlocal_keys:
 
1933
                # already handled
 
1934
                continue
 
1935
            components = []
 
1936
            cursor = key
 
1937
            while cursor is not None:
 
1938
                try:
 
1939
                    record, record_details, digest, next = record_map[cursor]
 
1940
                except KeyError:
 
1941
                    raise RevisionNotPresent(cursor, self)
 
1942
                components.append((cursor, record, record_details, digest))
 
1943
                cursor = next
 
1944
                if cursor in self._contents_map:
 
1945
                    # no need to plan further back
 
1946
                    components.append((cursor, None, None, None))
 
1947
                    break
 
1948
 
 
1949
            content = None
 
1950
            for (component_id, record, record_details,
 
1951
                 digest) in reversed(components):
 
1952
                if component_id in self._contents_map:
 
1953
                    content = self._contents_map[component_id]
 
1954
                else:
 
1955
                    content, delta = self._factory.parse_record(key[-1],
 
1956
                        record, record_details, content,
 
1957
                        copy_base_content=multiple_versions)
 
1958
                    if multiple_versions:
 
1959
                        self._contents_map[component_id] = content
 
1960
 
 
1961
            # digest here is the digest from the last applied component.
 
1962
            text = content.text()
 
1963
            actual_sha = sha_strings(text)
 
1964
            if actual_sha != digest:
 
1965
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
1966
        if multiple_versions:
 
1967
            return self._contents_map[requested_key]
 
1968
        else:
 
1969
            return content
 
1970
 
 
1971
    def _wire_bytes(self):
 
1972
        """Get the bytes to put on the wire for 'key'.
 
1973
 
 
1974
        The first collection of bytes asked for returns the serialised
 
1975
        raw_record_map and the additional details (key, parent) for key.
 
1976
        Subsequent calls return just the additional details (key, parent).
 
1977
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
1978
        For subsequent keys it is 'knit-delta-closure-ref'.
 
1979
 
 
1980
        :param key: A key from the content generator.
 
1981
        :return: Bytes to put on the wire.
 
1982
        """
 
1983
        lines = []
 
1984
        # kind marker for dispatch on the far side,
 
1985
        lines.append('knit-delta-closure')
 
1986
        # Annotated or not
 
1987
        if self.vf._factory.annotated:
 
1988
            lines.append('annotated')
 
1989
        else:
 
1990
            lines.append('')
 
1991
        # then the list of keys
 
1992
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
1993
            if key not in self.nonlocal_keys]))
 
1994
        # then the _raw_record_map in serialised form:
 
1995
        map_byte_list = []
 
1996
        # for each item in the map:
 
1997
        # 1 line with key
 
1998
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
1999
        # one line with method
 
2000
        # one line with noeol
 
2001
        # one line with next ('' for None)
 
2002
        # one line with byte count of the record bytes
 
2003
        # the record bytes
 
2004
        for key, (record_bytes, (method, noeol), next) in \
 
2005
            self._raw_record_map.iteritems():
 
2006
            key_bytes = '\x00'.join(key)
 
2007
            parents = self.global_map.get(key, None)
 
2008
            if parents is None:
 
2009
                parent_bytes = 'None:'
 
2010
            else:
 
2011
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2012
            method_bytes = method
 
2013
            if noeol:
 
2014
                noeol_bytes = "T"
 
2015
            else:
 
2016
                noeol_bytes = "F"
 
2017
            if next:
 
2018
                next_bytes = '\x00'.join(next)
 
2019
            else:
 
2020
                next_bytes = ''
 
2021
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2022
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2023
                len(record_bytes), record_bytes))
 
2024
        map_bytes = ''.join(map_byte_list)
 
2025
        lines.append(map_bytes)
 
2026
        bytes = '\n'.join(lines)
 
2027
        return bytes
 
2028
 
 
2029
 
 
2030
class _VFContentMapGenerator(_ContentMapGenerator):
 
2031
    """Content map generator reading from a VersionedFiles object."""
 
2032
 
 
2033
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2034
        global_map=None, raw_record_map=None):
 
2035
        """Create a _ContentMapGenerator.
 
2036
        
 
2037
        :param versioned_files: The versioned files that the texts are being
 
2038
            extracted from.
 
2039
        :param keys: The keys to produce content maps for.
 
2040
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2041
            which are known to not be in this knit, but rather in one of the
 
2042
            fallback knits.
 
2043
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2044
            This is required if get_record_stream() is to be used.
 
2045
        :param raw_record_map: A unparsed raw record map to use for answering
 
2046
            contents.
 
2047
        """
 
2048
        # The vf to source data from
 
2049
        self.vf = versioned_files
 
2050
        # The keys desired
 
2051
        self.keys = list(keys)
 
2052
        # Keys known to be in fallback vfs objects
 
2053
        if nonlocal_keys is None:
 
2054
            self.nonlocal_keys = set()
 
2055
        else:
 
2056
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2057
        # Parents data for keys to be returned in get_record_stream
 
2058
        self.global_map = global_map
 
2059
        # The chunked lists for self.keys in text form
 
2060
        self._text_map = {}
 
2061
        # A cache of KnitContent objects used in extracting texts.
 
2062
        self._contents_map = {}
 
2063
        # All the knit records needed to assemble the requested keys as full
 
2064
        # texts.
 
2065
        self._record_map = None
 
2066
        if raw_record_map is None:
 
2067
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2068
                allow_missing=True)
 
2069
        else:
 
2070
            self._raw_record_map = raw_record_map
 
2071
        # the factory for parsing records
 
2072
        self._factory = self.vf._factory
 
2073
 
 
2074
 
 
2075
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2076
    """Content map generator sourced from a network stream."""
 
2077
 
 
2078
    def __init__(self, bytes, line_end):
 
2079
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2080
        self._bytes = bytes
 
2081
        self.global_map = {}
 
2082
        self._raw_record_map = {}
 
2083
        self._contents_map = {}
 
2084
        self._record_map = None
 
2085
        self.nonlocal_keys = []
 
2086
        # Get access to record parsing facilities
 
2087
        self.vf = KnitVersionedFiles(None, None)
 
2088
        start = line_end
 
2089
        # Annotated or not
 
2090
        line_end = bytes.find('\n', start)
 
2091
        line = bytes[start:line_end]
 
2092
        start = line_end + 1
 
2093
        if line == 'annotated':
 
2094
            self._factory = KnitAnnotateFactory()
 
2095
        else:
 
2096
            self._factory = KnitPlainFactory()
 
2097
        # list of keys to emit in get_record_stream
 
2098
        line_end = bytes.find('\n', start)
 
2099
        line = bytes[start:line_end]
 
2100
        start = line_end + 1
 
2101
        self.keys = [
 
2102
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2103
            if segment]
 
2104
        # now a loop until the end. XXX: It would be nice if this was just a
 
2105
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2106
        # there is a decent sized gap stopping that at the moment.
 
2107
        end = len(bytes)
 
2108
        while start < end:
 
2109
            # 1 line with key
 
2110
            line_end = bytes.find('\n', start)
 
2111
            key = tuple(bytes[start:line_end].split('\x00'))
 
2112
            start = line_end + 1
 
2113
            # 1 line with parents (None: for None, '' for ())
 
2114
            line_end = bytes.find('\n', start)
 
2115
            line = bytes[start:line_end]
 
2116
            if line == 'None:':
 
2117
                parents = None
 
2118
            else:
 
2119
                parents = tuple(
 
2120
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2121
                     if segment])
 
2122
            self.global_map[key] = parents
 
2123
            start = line_end + 1
 
2124
            # one line with method
 
2125
            line_end = bytes.find('\n', start)
 
2126
            line = bytes[start:line_end]
 
2127
            method = line
 
2128
            start = line_end + 1
 
2129
            # one line with noeol
 
2130
            line_end = bytes.find('\n', start)
 
2131
            line = bytes[start:line_end]
 
2132
            noeol = line == "T"
 
2133
            start = line_end + 1
 
2134
            # one line with next ('' for None)
 
2135
            line_end = bytes.find('\n', start)
 
2136
            line = bytes[start:line_end]
 
2137
            if not line:
 
2138
                next = None
 
2139
            else:
 
2140
                next = tuple(bytes[start:line_end].split('\x00'))
 
2141
            start = line_end + 1
 
2142
            # one line with byte count of the record bytes
 
2143
            line_end = bytes.find('\n', start)
 
2144
            line = bytes[start:line_end]
 
2145
            count = int(line)
 
2146
            start = line_end + 1
 
2147
            # the record bytes
 
2148
            record_bytes = bytes[start:start+count]
 
2149
            start = start + count
 
2150
            # put it in the map
 
2151
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2152
 
 
2153
    def get_record_stream(self):
 
2154
        """Get a record stream for for keys requested by the bytestream."""
 
2155
        first = True
 
2156
        for key in self.keys:
 
2157
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2158
            first = False
 
2159
 
 
2160
    def _wire_bytes(self):
 
2161
        return self._bytes
 
2162
 
 
2163
 
1769
2164
class _KndxIndex(object):
1770
2165
    """Manages knit index files
1771
2166
 
1837
2232
        self._reset_cache()
1838
2233
        self.has_graph = True
1839
2234
 
1840
 
    def add_records(self, records, random_id=False):
 
2235
    def add_records(self, records, random_id=False, missing_compression_parents=False):
1841
2236
        """Add multiple records to the index.
1842
2237
        
1843
2238
        :param records: a list of tuples:
1844
2239
                         (key, options, access_memo, parents).
1845
2240
        :param random_id: If True the ids being added were randomly generated
1846
2241
            and no check for existence will be performed.
 
2242
        :param missing_compression_parents: If True the records being added are
 
2243
            only compressed against texts already in the index (or inside
 
2244
            records). If False the records all refer to unavailable texts (or
 
2245
            texts inside records) as compression parents.
1847
2246
        """
 
2247
        if missing_compression_parents:
 
2248
            # It might be nice to get the edge of the records. But keys isn't
 
2249
            # _wrong_.
 
2250
            keys = sorted(record[0] for record in records)
 
2251
            raise errors.RevisionNotPresent(keys, self)
1848
2252
        paths = {}
1849
2253
        for record in records:
1850
2254
            key = record[0]
1881
2285
                self._kndx_cache[prefix] = (orig_cache, orig_history)
1882
2286
                raise
1883
2287
 
 
2288
    def scan_unvalidated_index(self, graph_index):
 
2289
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2290
        # Because kndx files do not support atomic insertion via separate index
 
2291
        # files, they do not support this method.
 
2292
        raise NotImplementedError(self.scan_unvalidated_index)
 
2293
 
 
2294
    def get_missing_compression_parents(self):
 
2295
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2296
        # Because kndx files do not support atomic insertion via separate index
 
2297
        # files, they do not support this method.
 
2298
        raise NotImplementedError(self.get_missing_compression_parents)
 
2299
    
1884
2300
    def _cache_key(self, key, options, pos, size, parent_keys):
1885
2301
        """Cache a version record in the history array and index cache.
1886
2302
 
2196
2612
                "parent tracking.")
2197
2613
        self.has_graph = parents
2198
2614
        self._is_locked = is_locked
 
2615
        self._missing_compression_parents = set()
2199
2616
 
2200
2617
    def __repr__(self):
2201
2618
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2202
2619
 
2203
 
    def add_records(self, records, random_id=False):
 
2620
    def add_records(self, records, random_id=False,
 
2621
        missing_compression_parents=False):
2204
2622
        """Add multiple records to the index.
2205
2623
        
2206
2624
        This function does not insert data into the Immutable GraphIndex
2212
2630
                         (key, options, access_memo, parents).
2213
2631
        :param random_id: If True the ids being added were randomly generated
2214
2632
            and no check for existence will be performed.
 
2633
        :param missing_compression_parents: If True the records being added are
 
2634
            only compressed against texts already in the index (or inside
 
2635
            records). If False the records all refer to unavailable texts (or
 
2636
            texts inside records) as compression parents.
2215
2637
        """
2216
2638
        if not self._add_callback:
2217
2639
            raise errors.ReadOnlyError(self)
2219
2641
        # anymore.
2220
2642
 
2221
2643
        keys = {}
 
2644
        compression_parents = set()
2222
2645
        for (key, options, access_memo, parents) in records:
2223
2646
            if self._parents:
2224
2647
                parents = tuple(parents)
2235
2658
                if self._deltas:
2236
2659
                    if 'line-delta' in options:
2237
2660
                        node_refs = (parents, (parents[0],))
 
2661
                        if missing_compression_parents:
 
2662
                            compression_parents.add(parents[0])
2238
2663
                    else:
2239
2664
                        node_refs = (parents, ())
2240
2665
                else:
2262
2687
            for key, (value, node_refs) in keys.iteritems():
2263
2688
                result.append((key, value))
2264
2689
        self._add_callback(result)
 
2690
        if missing_compression_parents:
 
2691
            # This may appear to be incorrect (it does not check for
 
2692
            # compression parents that are in the existing graph index),
 
2693
            # but such records won't have been buffered, so this is
 
2694
            # actually correct: every entry when
 
2695
            # missing_compression_parents==True either has a missing parent, or
 
2696
            # a parent that is one of the keys in records.
 
2697
            compression_parents.difference_update(keys)
 
2698
            self._missing_compression_parents.update(compression_parents)
 
2699
        # Adding records may have satisfied missing compression parents.
 
2700
        self._missing_compression_parents.difference_update(keys)
2265
2701
        
 
2702
    def scan_unvalidated_index(self, graph_index):
 
2703
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2704
 
 
2705
        This allows this _KnitGraphIndex to keep track of any missing
 
2706
        compression parents we may want to have filled in to make those
 
2707
        indices valid.
 
2708
 
 
2709
        :param graph_index: A GraphIndex
 
2710
        """
 
2711
        if self._deltas:
 
2712
            new_missing = graph_index.external_references(ref_list_num=1)
 
2713
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2714
            self._missing_compression_parents.update(new_missing)
 
2715
 
 
2716
    def get_missing_compression_parents(self):
 
2717
        """Return the keys of missing compression parents.
 
2718
 
 
2719
        Missing compression parents occur when a record stream was missing
 
2720
        basis texts, or a index was scanned that had missing basis texts.
 
2721
        """
 
2722
        return frozenset(self._missing_compression_parents)
 
2723
 
2266
2724
    def _check_read(self):
2267
2725
        """raise if reads are not permitted."""
2268
2726
        if not self._is_locked():