1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
from io import BytesIO
58
from ..lazy_import import lazy_import
59
lazy_import(globals(), """
72
from breezy.bzr import (
77
from breezy.bzr import pack_repo
78
from breezy.i18n import gettext
85
from ..errors import (
91
from ..osutils import (
97
from ..bzr.versionedfile import (
104
VersionedFilesWithFallbacks,
108
# TODO: Split out code specific to this format into an associated object.
110
# TODO: Can we put in some kind of value to check that the index and data
111
# files belong together?
113
# TODO: accommodate binaries, perhaps by storing a byte count
115
# TODO: function to check whole file
117
# TODO: atomically append data, then measure backwards from the cursor
118
# position after writing to work out where it was located. we may need to
119
# bypass python file buffering.
121
DATA_SUFFIX = '.knit'
122
INDEX_SUFFIX = '.kndx'
123
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
126
class KnitError(InternalBzrError):
131
class KnitCorrupt(KnitError):
133
_fmt = "Knit %(filename)s corrupt: %(how)s"
135
def __init__(self, filename, how):
136
KnitError.__init__(self)
137
self.filename = filename
141
class SHA1KnitCorrupt(KnitCorrupt):
143
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
144
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
147
def __init__(self, filename, actual, expected, key, content):
148
KnitError.__init__(self)
149
self.filename = filename
151
self.expected = expected
153
self.content = content
156
class KnitDataStreamIncompatible(KnitError):
157
# Not raised anymore, as we can convert data streams. In future we may
158
# need it again for more exotic cases, so we're keeping it around for now.
160
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
162
def __init__(self, stream_format, target_format):
163
self.stream_format = stream_format
164
self.target_format = target_format
167
class KnitDataStreamUnknown(KnitError):
168
# Indicates a data stream we don't know how to handle.
170
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
172
def __init__(self, stream_format):
173
self.stream_format = stream_format
176
class KnitHeaderError(KnitError):
178
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
180
def __init__(self, badline, filename):
181
KnitError.__init__(self)
182
self.badline = badline
183
self.filename = filename
186
class KnitIndexUnknownMethod(KnitError):
187
"""Raised when we don't understand the storage method.
189
Currently only 'fulltext' and 'line-delta' are supported.
192
_fmt = ("Knit index %(filename)s does not have a known method"
193
" in options: %(options)r")
195
def __init__(self, filename, options):
196
KnitError.__init__(self)
197
self.filename = filename
198
self.options = options
201
class KnitAdapter(object):
202
"""Base class for knit record adaption."""
204
def __init__(self, basis_vf):
205
"""Create an adapter which accesses full texts from basis_vf.
207
:param basis_vf: A versioned file to access basis texts of deltas from.
208
May be None for adapters that do not need to access basis texts.
210
self._data = KnitVersionedFiles(None, None)
211
self._annotate_factory = KnitAnnotateFactory()
212
self._plain_factory = KnitPlainFactory()
213
self._basis_vf = basis_vf
216
class FTAnnotatedToUnannotated(KnitAdapter):
217
"""An adapter from FT annotated knits to unannotated ones."""
219
def get_bytes(self, factory, target_storage_kind):
220
if target_storage_kind != 'knit-ft-gz':
221
raise errors.UnavailableRepresentation(
222
factory.key, target_storage_kind, factory.storage_kind)
223
annotated_compressed_bytes = factory._raw_record
225
self._data._parse_record_unchecked(annotated_compressed_bytes)
226
content = self._annotate_factory.parse_fulltext(contents, rec[1])
227
size, chunks = self._data._record_to_data(
228
(rec[1],), rec[3], content.text())
229
return b''.join(chunks)
232
class DeltaAnnotatedToUnannotated(KnitAdapter):
233
"""An adapter for deltas from annotated to unannotated."""
235
def get_bytes(self, factory, target_storage_kind):
236
if target_storage_kind != 'knit-delta-gz':
237
raise errors.UnavailableRepresentation(
238
factory.key, target_storage_kind, factory.storage_kind)
239
annotated_compressed_bytes = factory._raw_record
241
self._data._parse_record_unchecked(annotated_compressed_bytes)
242
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
244
contents = self._plain_factory.lower_line_delta(delta)
245
size, chunks = self._data._record_to_data((rec[1],), rec[3], contents)
246
return b''.join(chunks)
249
class FTAnnotatedToFullText(KnitAdapter):
250
"""An adapter from FT annotated knits to unannotated ones."""
252
def get_bytes(self, factory, target_storage_kind):
253
annotated_compressed_bytes = factory._raw_record
255
self._data._parse_record_unchecked(annotated_compressed_bytes)
256
content, delta = self._annotate_factory.parse_record(factory.key[-1],
257
contents, factory._build_details, None)
258
if target_storage_kind == 'fulltext':
259
return b''.join(content.text())
260
elif target_storage_kind in ('chunked', 'lines'):
261
return content.text()
262
raise errors.UnavailableRepresentation(
263
factory.key, target_storage_kind, factory.storage_kind)
266
class DeltaAnnotatedToFullText(KnitAdapter):
267
"""An adapter for deltas from annotated to unannotated."""
269
def get_bytes(self, factory, target_storage_kind):
270
annotated_compressed_bytes = factory._raw_record
272
self._data._parse_record_unchecked(annotated_compressed_bytes)
273
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
275
compression_parent = factory.parents[0]
276
basis_entry = next(self._basis_vf.get_record_stream(
277
[compression_parent], 'unordered', True))
278
if basis_entry.storage_kind == 'absent':
279
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
280
basis_lines = basis_entry.get_bytes_as('lines')
281
# Manually apply the delta because we have one annotated content and
283
basis_content = PlainKnitContent(basis_lines, compression_parent)
284
basis_content.apply_delta(delta, rec[1])
285
basis_content._should_strip_eol = factory._build_details[1]
287
if target_storage_kind == 'fulltext':
288
return b''.join(basis_content.text())
289
elif target_storage_kind in ('chunked', 'lines'):
290
return basis_content.text()
291
raise errors.UnavailableRepresentation(
292
factory.key, target_storage_kind, factory.storage_kind)
295
class FTPlainToFullText(KnitAdapter):
296
"""An adapter from FT plain knits to unannotated ones."""
298
def get_bytes(self, factory, target_storage_kind):
299
compressed_bytes = factory._raw_record
301
self._data._parse_record_unchecked(compressed_bytes)
302
content, delta = self._plain_factory.parse_record(factory.key[-1],
303
contents, factory._build_details, None)
304
if target_storage_kind == 'fulltext':
305
return b''.join(content.text())
306
elif target_storage_kind in ('chunked', 'lines'):
307
return content.text()
308
raise errors.UnavailableRepresentation(
309
factory.key, target_storage_kind, factory.storage_kind)
312
class DeltaPlainToFullText(KnitAdapter):
313
"""An adapter for deltas from annotated to unannotated."""
315
def get_bytes(self, factory, target_storage_kind):
316
compressed_bytes = factory._raw_record
318
self._data._parse_record_unchecked(compressed_bytes)
319
delta = self._plain_factory.parse_line_delta(contents, rec[1])
320
compression_parent = factory.parents[0]
321
# XXX: string splitting overhead.
322
basis_entry = next(self._basis_vf.get_record_stream(
323
[compression_parent], 'unordered', True))
324
if basis_entry.storage_kind == 'absent':
325
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
326
basis_lines = basis_entry.get_bytes_as('lines')
327
basis_content = PlainKnitContent(basis_lines, compression_parent)
328
# Manually apply the delta because we have one annotated content and
330
content, _ = self._plain_factory.parse_record(rec[1], contents,
331
factory._build_details, basis_content)
332
if target_storage_kind == 'fulltext':
333
return b''.join(content.text())
334
elif target_storage_kind in ('chunked', 'lines'):
335
return content.text()
336
raise errors.UnavailableRepresentation(
337
factory.key, target_storage_kind, factory.storage_kind)
340
class KnitContentFactory(ContentFactory):
341
"""Content factory for streaming from knits.
343
:seealso ContentFactory:
346
def __init__(self, key, parents, build_details, sha1, raw_record,
347
annotated, knit=None, network_bytes=None):
348
"""Create a KnitContentFactory for key.
351
:param parents: The parents.
352
:param build_details: The build details as returned from
354
:param sha1: The sha1 expected from the full text of this object.
355
:param raw_record: The bytes of the knit data from disk.
356
:param annotated: True if the raw data is annotated.
357
:param network_bytes: None to calculate the network bytes on demand,
358
not-none if they are already known.
360
ContentFactory.__init__(self)
363
self.parents = parents
364
if build_details[0] == 'line-delta':
369
annotated_kind = 'annotated-'
372
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
373
self._raw_record = raw_record
374
self._network_bytes = network_bytes
375
self._build_details = build_details
378
def _create_network_bytes(self):
379
"""Create a fully serialised network version for transmission."""
380
# storage_kind, key, parents, Noeol, raw_record
381
key_bytes = b'\x00'.join(self.key)
382
if self.parents is None:
383
parent_bytes = b'None:'
385
parent_bytes = b'\t'.join(b'\x00'.join(key)
386
for key in self.parents)
387
if self._build_details[1]:
391
network_bytes = b"%s\n%s\n%s\n%s%s" % (
392
self.storage_kind.encode('ascii'), key_bytes,
393
parent_bytes, noeol, self._raw_record)
394
self._network_bytes = network_bytes
396
def get_bytes_as(self, storage_kind):
397
if storage_kind == self.storage_kind:
398
if self._network_bytes is None:
399
self._create_network_bytes()
400
return self._network_bytes
401
if ('-ft-' in self.storage_kind
402
and storage_kind in ('chunked', 'fulltext', 'lines')):
403
adapter_key = (self.storage_kind, storage_kind)
404
adapter_factory = adapter_registry.get(adapter_key)
405
adapter = adapter_factory(None)
406
return adapter.get_bytes(self, storage_kind)
407
if self._knit is not None:
408
# Not redundant with direct conversion above - that only handles
410
if storage_kind in ('chunked', 'lines'):
411
return self._knit.get_lines(self.key[0])
412
elif storage_kind == 'fulltext':
413
return self._knit.get_text(self.key[0])
414
raise errors.UnavailableRepresentation(self.key, storage_kind,
417
def iter_bytes_as(self, storage_kind):
418
return iter(self.get_bytes_as(storage_kind))
421
class LazyKnitContentFactory(ContentFactory):
422
"""A ContentFactory which can either generate full text or a wire form.
424
:seealso ContentFactory:
427
def __init__(self, key, parents, generator, first):
428
"""Create a LazyKnitContentFactory.
430
:param key: The key of the record.
431
:param parents: The parents of the record.
432
:param generator: A _ContentMapGenerator containing the record for this
434
:param first: Is this the first content object returned from generator?
435
if it is, its storage kind is knit-delta-closure, otherwise it is
436
knit-delta-closure-ref
439
self.parents = parents
442
self._generator = generator
443
self.storage_kind = "knit-delta-closure"
445
self.storage_kind = self.storage_kind + "-ref"
448
def get_bytes_as(self, storage_kind):
449
if storage_kind == self.storage_kind:
451
return self._generator._wire_bytes()
453
# all the keys etc are contained in the bytes returned in the
456
if storage_kind in ('chunked', 'fulltext', 'lines'):
457
chunks = self._generator._get_one_work(self.key).text()
458
if storage_kind in ('chunked', 'lines'):
461
return b''.join(chunks)
462
raise errors.UnavailableRepresentation(self.key, storage_kind,
465
def iter_bytes_as(self, storage_kind):
466
if storage_kind in ('chunked', 'lines'):
467
chunks = self._generator._get_one_work(self.key).text()
469
raise errors.UnavailableRepresentation(self.key, storage_kind,
473
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
474
"""Convert a network record to a iterator over stream records.
476
:param storage_kind: The storage kind of the record.
477
Must be 'knit-delta-closure'.
478
:param bytes: The bytes of the record on the network.
480
generator = _NetworkContentMapGenerator(bytes, line_end)
481
return generator.get_record_stream()
484
def knit_network_to_record(storage_kind, bytes, line_end):
485
"""Convert a network record to a record object.
487
:param storage_kind: The storage kind of the record.
488
:param bytes: The bytes of the record on the network.
491
line_end = bytes.find(b'\n', start)
492
key = tuple(bytes[start:line_end].split(b'\x00'))
494
line_end = bytes.find(b'\n', start)
495
parent_line = bytes[start:line_end]
496
if parent_line == b'None:':
500
[tuple(segment.split(b'\x00')) for segment in parent_line.split(b'\t')
503
noeol = bytes[start:start + 1] == b'N'
504
if 'ft' in storage_kind:
507
method = 'line-delta'
508
build_details = (method, noeol)
510
raw_record = bytes[start:]
511
annotated = 'annotated' in storage_kind
512
return [KnitContentFactory(key, parents, build_details, None, raw_record,
513
annotated, network_bytes=bytes)]
516
class KnitContent(object):
517
"""Content of a knit version to which deltas can be applied.
519
This is always stored in memory as a list of lines with \\n at the end,
520
plus a flag saying if the final ending is really there or not, because that
521
corresponds to the on-disk knit representation.
525
self._should_strip_eol = False
527
def apply_delta(self, delta, new_version_id):
528
"""Apply delta to this object to become new_version_id."""
529
raise NotImplementedError(self.apply_delta)
531
def line_delta_iter(self, new_lines):
532
"""Generate line-based delta from this content to new_lines."""
533
new_texts = new_lines.text()
534
old_texts = self.text()
535
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
536
for tag, i1, i2, j1, j2 in s.get_opcodes():
539
# ofrom, oto, length, data
540
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
542
def line_delta(self, new_lines):
543
return list(self.line_delta_iter(new_lines))
546
def get_line_delta_blocks(knit_delta, source, target):
547
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
548
target_len = len(target)
551
for s_begin, s_end, t_len, new_text in knit_delta:
552
true_n = s_begin - s_pos
555
# knit deltas do not provide reliable info about whether the
556
# last line of a file matches, due to eol handling.
557
if source[s_pos + n - 1] != target[t_pos + n - 1]:
560
yield s_pos, t_pos, n
561
t_pos += t_len + true_n
563
n = target_len - t_pos
565
if source[s_pos + n - 1] != target[t_pos + n - 1]:
568
yield s_pos, t_pos, n
569
yield s_pos + (target_len - t_pos), target_len, 0
572
class AnnotatedKnitContent(KnitContent):
573
"""Annotated content."""
575
def __init__(self, lines):
576
KnitContent.__init__(self)
577
self._lines = list(lines)
580
"""Return a list of (origin, text) for each content line."""
581
lines = self._lines[:]
582
if self._should_strip_eol:
583
origin, last_line = lines[-1]
584
lines[-1] = (origin, last_line.rstrip(b'\n'))
587
def apply_delta(self, delta, new_version_id):
588
"""Apply delta to this object to become new_version_id."""
591
for start, end, count, delta_lines in delta:
592
lines[offset + start:offset + end] = delta_lines
593
offset = offset + (start - end) + count
597
lines = [text for origin, text in self._lines]
598
except ValueError as e:
599
# most commonly (only?) caused by the internal form of the knit
600
# missing annotation information because of a bug - see thread
602
raise KnitCorrupt(self,
603
"line in annotated knit missing annotation information: %s"
605
if self._should_strip_eol:
606
lines[-1] = lines[-1].rstrip(b'\n')
610
return AnnotatedKnitContent(self._lines)
613
class PlainKnitContent(KnitContent):
614
"""Unannotated content.
616
When annotate[_iter] is called on this content, the same version is reported
617
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
621
def __init__(self, lines, version_id):
622
KnitContent.__init__(self)
624
self._version_id = version_id
627
"""Return a list of (origin, text) for each content line."""
628
return [(self._version_id, line) for line in self._lines]
630
def apply_delta(self, delta, new_version_id):
631
"""Apply delta to this object to become new_version_id."""
634
for start, end, count, delta_lines in delta:
635
lines[offset + start:offset + end] = delta_lines
636
offset = offset + (start - end) + count
637
self._version_id = new_version_id
640
return PlainKnitContent(self._lines[:], self._version_id)
644
if self._should_strip_eol:
646
lines[-1] = lines[-1].rstrip(b'\n')
650
class _KnitFactory(object):
651
"""Base class for common Factory functions."""
653
def parse_record(self, version_id, record, record_details,
654
base_content, copy_base_content=True):
655
"""Parse a record into a full content object.
657
:param version_id: The official version id for this content
658
:param record: The data returned by read_records_iter()
659
:param record_details: Details about the record returned by
661
:param base_content: If get_build_details returns a compression_parent,
662
you must return a base_content here, else use None
663
:param copy_base_content: When building from the base_content, decide
664
you can either copy it and return a new object, or modify it in
666
:return: (content, delta) A Content object and possibly a line-delta,
669
method, noeol = record_details
670
if method == 'line-delta':
671
if copy_base_content:
672
content = base_content.copy()
674
content = base_content
675
delta = self.parse_line_delta(record, version_id)
676
content.apply_delta(delta, version_id)
678
content = self.parse_fulltext(record, version_id)
680
content._should_strip_eol = noeol
681
return (content, delta)
684
class KnitAnnotateFactory(_KnitFactory):
685
"""Factory for creating annotated Content objects."""
689
def make(self, lines, version_id):
690
num_lines = len(lines)
691
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
693
def parse_fulltext(self, content, version_id):
694
"""Convert fulltext to internal representation
696
fulltext content is of the format
697
revid(utf8) plaintext\n
698
internal representation is of the format:
701
# TODO: jam 20070209 The tests expect this to be returned as tuples,
702
# but the code itself doesn't really depend on that.
703
# Figure out a way to not require the overhead of turning the
704
# list back into tuples.
705
lines = (tuple(line.split(b' ', 1)) for line in content)
706
return AnnotatedKnitContent(lines)
708
def parse_line_delta(self, lines, version_id, plain=False):
709
"""Convert a line based delta into internal representation.
711
line delta is in the form of:
712
intstart intend intcount
714
revid(utf8) newline\n
715
internal representation is
716
(start, end, count, [1..count tuples (revid, newline)])
718
:param plain: If True, the lines are returned as a plain
719
list without annotations, not as a list of (origin, content) tuples, i.e.
720
(start, end, count, [1..count newline])
727
def cache_and_return(line):
728
origin, text = line.split(b' ', 1)
729
return cache.setdefault(origin, origin), text
731
# walk through the lines parsing.
732
# Note that the plain test is explicitly pulled out of the
733
# loop to minimise any performance impact
736
start, end, count = [int(n) for n in header.split(b',')]
737
contents = [next(lines).split(b' ', 1)[1]
738
for _ in range(count)]
739
result.append((start, end, count, contents))
742
start, end, count = [int(n) for n in header.split(b',')]
743
contents = [tuple(next(lines).split(b' ', 1))
744
for _ in range(count)]
745
result.append((start, end, count, contents))
748
def get_fulltext_content(self, lines):
749
"""Extract just the content lines from a fulltext."""
750
return (line.split(b' ', 1)[1] for line in lines)
752
def get_linedelta_content(self, lines):
753
"""Extract just the content from a line delta.
755
This doesn't return all of the extra information stored in a delta.
756
Only the actual content lines.
760
header = header.split(b',')
761
count = int(header[2])
762
for _ in range(count):
763
origin, text = next(lines).split(b' ', 1)
766
def lower_fulltext(self, content):
767
"""convert a fulltext content record into a serializable form.
769
see parse_fulltext which this inverts.
771
return [b'%s %s' % (o, t) for o, t in content._lines]
773
def lower_line_delta(self, delta):
774
"""convert a delta into a serializable form.
776
See parse_line_delta which this inverts.
778
# TODO: jam 20070209 We only do the caching thing to make sure that
779
# the origin is a valid utf-8 line, eventually we could remove it
781
for start, end, c, lines in delta:
782
out.append(b'%d,%d,%d\n' % (start, end, c))
783
out.extend(origin + b' ' + text
784
for origin, text in lines)
787
def annotate(self, knit, key):
788
content = knit._get_content(key)
789
# adjust for the fact that serialised annotations are only key suffixes
791
if isinstance(key, tuple):
793
origins = content.annotate()
795
for origin, line in origins:
796
result.append((prefix + (origin,), line))
799
# XXX: This smells a bit. Why would key ever be a non-tuple here?
800
# Aren't keys defined to be tuples? -- spiv 20080618
801
return content.annotate()
804
class KnitPlainFactory(_KnitFactory):
805
"""Factory for creating plain Content objects."""
809
def make(self, lines, version_id):
810
return PlainKnitContent(lines, version_id)
812
def parse_fulltext(self, content, version_id):
813
"""This parses an unannotated fulltext.
815
Note that this is not a noop - the internal representation
816
has (versionid, line) - its just a constant versionid.
818
return self.make(content, version_id)
820
def parse_line_delta_iter(self, lines, version_id):
822
num_lines = len(lines)
823
while cur < num_lines:
826
start, end, c = [int(n) for n in header.split(b',')]
827
yield start, end, c, lines[cur:cur + c]
830
def parse_line_delta(self, lines, version_id):
831
return list(self.parse_line_delta_iter(lines, version_id))
833
def get_fulltext_content(self, lines):
834
"""Extract just the content lines from a fulltext."""
837
def get_linedelta_content(self, lines):
838
"""Extract just the content from a line delta.
840
This doesn't return all of the extra information stored in a delta.
841
Only the actual content lines.
845
header = header.split(b',')
846
count = int(header[2])
847
for _ in range(count):
850
def lower_fulltext(self, content):
851
return content.text()
853
def lower_line_delta(self, delta):
855
for start, end, c, lines in delta:
856
out.append(b'%d,%d,%d\n' % (start, end, c))
860
def annotate(self, knit, key):
861
annotator = _KnitAnnotator(knit)
862
return annotator.annotate_flat(key)
865
def make_file_factory(annotated, mapper):
866
"""Create a factory for creating a file based KnitVersionedFiles.
868
This is only functional enough to run interface tests, it doesn't try to
869
provide a full pack environment.
871
:param annotated: knit annotations are wanted.
872
:param mapper: The mapper from keys to paths.
874
def factory(transport):
875
index = _KndxIndex(transport, mapper, lambda: None,
876
lambda: True, lambda: True)
877
access = _KnitKeyAccess(transport, mapper)
878
return KnitVersionedFiles(index, access, annotated=annotated)
882
def make_pack_factory(graph, delta, keylength):
883
"""Create a factory for creating a pack based VersionedFiles.
885
This is only functional enough to run interface tests, it doesn't try to
886
provide a full pack environment.
888
:param graph: Store a graph.
889
:param delta: Delta compress contents.
890
:param keylength: How long should keys be.
892
def factory(transport):
893
parents = graph or delta
899
max_delta_chain = 200
902
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
903
key_elements=keylength)
904
stream = transport.open_write_stream('newpack')
905
writer = pack.ContainerWriter(stream.write)
907
index = _KnitGraphIndex(graph_index, lambda: True, parents=parents,
908
deltas=delta, add_callback=graph_index.add_nodes)
909
access = pack_repo._DirectPackAccess({})
910
access.set_writer(writer, graph_index, (transport, 'newpack'))
911
result = KnitVersionedFiles(index, access,
912
max_delta_chain=max_delta_chain)
913
result.stream = stream
914
result.writer = writer
919
def cleanup_pack_knit(versioned_files):
920
versioned_files.stream.close()
921
versioned_files.writer.end()
924
def _get_total_build_size(self, keys, positions):
925
"""Determine the total bytes to build these keys.
927
(helper function because _KnitGraphIndex and _KndxIndex work the same, but
928
don't inherit from a common base.)
930
:param keys: Keys that we want to build
931
:param positions: dict of {key, (info, index_memo, comp_parent)} (such
932
as returned by _get_components_positions)
933
:return: Number of bytes to build those keys
935
all_build_index_memos = {}
939
for key in build_keys:
940
# This is mostly for the 'stacked' case
941
# Where we will be getting the data from a fallback
942
if key not in positions:
944
_, index_memo, compression_parent = positions[key]
945
all_build_index_memos[key] = index_memo
946
if compression_parent not in all_build_index_memos:
947
next_keys.add(compression_parent)
948
build_keys = next_keys
949
return sum(index_memo[2]
950
for index_memo in all_build_index_memos.values())
953
class KnitVersionedFiles(VersionedFilesWithFallbacks):
954
"""Storage for many versioned files using knit compression.
956
Backend storage is managed by indices and data objects.
958
:ivar _index: A _KnitGraphIndex or similar that can describe the
959
parents, graph, compression and data location of entries in this
960
KnitVersionedFiles. Note that this is only the index for
961
*this* vfs; if there are fallbacks they must be queried separately.
964
def __init__(self, index, data_access, max_delta_chain=200,
965
annotated=False, reload_func=None):
966
"""Create a KnitVersionedFiles with index and data_access.
968
:param index: The index for the knit data.
969
:param data_access: The access object to store and retrieve knit
971
:param max_delta_chain: The maximum number of deltas to permit during
972
insertion. Set to 0 to prohibit the use of deltas.
973
:param annotated: Set to True to cause annotations to be calculated and
974
stored during insertion.
975
:param reload_func: An function that can be called if we think we need
976
to reload the pack listing and try again. See
977
'breezy.bzr.pack_repo.AggregateIndex' for the signature.
980
self._access = data_access
981
self._max_delta_chain = max_delta_chain
983
self._factory = KnitAnnotateFactory()
985
self._factory = KnitPlainFactory()
986
self._immediate_fallback_vfs = []
987
self._reload_func = reload_func
990
return "%s(%r, %r)" % (
991
self.__class__.__name__,
995
def without_fallbacks(self):
996
"""Return a clone of this object without any fallbacks configured."""
997
return KnitVersionedFiles(self._index, self._access,
998
self._max_delta_chain, self._factory.annotated,
1001
def add_fallback_versioned_files(self, a_versioned_files):
1002
"""Add a source of texts for texts not present in this knit.
1004
:param a_versioned_files: A VersionedFiles object.
1006
self._immediate_fallback_vfs.append(a_versioned_files)
1008
def add_lines(self, key, parents, lines, parent_texts=None,
1009
left_matching_blocks=None, nostore_sha=None, random_id=False,
1010
check_content=True):
1011
"""See VersionedFiles.add_lines()."""
1012
self._index._check_write_ok()
1013
self._check_add(key, lines, random_id, check_content)
1015
# The caller might pass None if there is no graph data, but kndx
1016
# indexes can't directly store that, so we give them
1017
# an empty tuple instead.
1019
line_bytes = b''.join(lines)
1020
return self._add(key, lines, parents,
1021
parent_texts, left_matching_blocks, nostore_sha, random_id,
1022
line_bytes=line_bytes)
1024
def add_content(self, content_factory, parent_texts=None,
1025
left_matching_blocks=None, nostore_sha=None,
1027
"""See VersionedFiles.add_content()."""
1028
self._index._check_write_ok()
1029
key = content_factory.key
1030
parents = content_factory.parents
1031
self._check_add(key, None, random_id, check_content=False)
1033
# The caller might pass None if there is no graph data, but kndx
1034
# indexes can't directly store that, so we give them
1035
# an empty tuple instead.
1037
lines = content_factory.get_bytes_as('lines')
1038
line_bytes = content_factory.get_bytes_as('fulltext')
1039
return self._add(key, lines, parents,
1040
parent_texts, left_matching_blocks, nostore_sha, random_id,
1041
line_bytes=line_bytes)
1043
def _add(self, key, lines, parents, parent_texts,
1044
left_matching_blocks, nostore_sha, random_id,
1046
"""Add a set of lines on top of version specified by parents.
1048
Any versions not present will be converted into ghosts.
1050
:param lines: A list of strings where each one is a single line (has a
1051
single newline at the end of the string) This is now optional
1052
(callers can pass None). It is left in its location for backwards
1053
compatibility. It should ''.join(lines) must == line_bytes
1054
:param line_bytes: A single string containing the content
1056
We pass both lines and line_bytes because different routes bring the
1057
values to this function. And for memory efficiency, we don't want to
1058
have to split/join on-demand.
1060
# first thing, if the content is something we don't need to store, find
1062
digest = sha_string(line_bytes)
1063
if nostore_sha == digest:
1064
raise errors.ExistingContent
1066
present_parents = []
1067
if parent_texts is None:
1069
# Do a single query to ascertain parent presence; we only compress
1070
# against parents in the same kvf.
1071
present_parent_map = self._index.get_parent_map(parents)
1072
for parent in parents:
1073
if parent in present_parent_map:
1074
present_parents.append(parent)
1076
# Currently we can only compress against the left most present parent.
1077
if (len(present_parents) == 0
1078
or present_parents[0] != parents[0]):
1081
# To speed the extract of texts the delta chain is limited
1082
# to a fixed number of deltas. This should minimize both
1083
# I/O and the time spend applying deltas.
1084
delta = self._check_should_delta(present_parents[0])
1086
text_length = len(line_bytes)
1089
# Note: line_bytes is not modified to add a newline, that is tracked
1090
# via the no_eol flag. 'lines' *is* modified, because that is the
1091
# general values needed by the Content code.
1092
if line_bytes and not line_bytes.endswith(b'\n'):
1093
options.append(b'no-eol')
1095
# Copy the existing list, or create a new one
1097
lines = osutils.split_lines(line_bytes)
1100
# Replace the last line with one that ends in a final newline
1101
lines[-1] = lines[-1] + b'\n'
1103
lines = osutils.split_lines(line_bytes)
1105
for element in key[:-1]:
1106
if not isinstance(element, bytes):
1107
raise TypeError("key contains non-bytestrings: %r" % (key,))
1109
key = key[:-1] + (b'sha1:' + digest,)
1110
elif not isinstance(key[-1], bytes):
1111
raise TypeError("key contains non-bytestrings: %r" % (key,))
1112
# Knit hunks are still last-element only
1113
version_id = key[-1]
1114
content = self._factory.make(lines, version_id)
1116
# Hint to the content object that its text() call should strip the
1118
content._should_strip_eol = True
1119
if delta or (self._factory.annotated and len(present_parents) > 0):
1120
# Merge annotations from parent texts if needed.
1121
delta_hunks = self._merge_annotations(content, present_parents,
1122
parent_texts, delta, self._factory.annotated,
1123
left_matching_blocks)
1126
options.append(b'line-delta')
1127
store_lines = self._factory.lower_line_delta(delta_hunks)
1128
size, data = self._record_to_data(key, digest, store_lines)
1130
options.append(b'fulltext')
1131
# isinstance is slower and we have no hierarchy.
1132
if self._factory.__class__ is KnitPlainFactory:
1133
# Use the already joined bytes saving iteration time in
1135
dense_lines = [line_bytes]
1137
dense_lines.append(b'\n')
1138
size, data = self._record_to_data(key, digest,
1141
# get mixed annotation + content and feed it into the
1143
store_lines = self._factory.lower_fulltext(content)
1144
size, data = self._record_to_data(key, digest, store_lines)
1146
access_memo = self._access.add_raw_record(key, size, data)
1147
self._index.add_records(
1148
((key, options, access_memo, parents),),
1149
random_id=random_id)
1150
return digest, text_length, content
1152
def annotate(self, key):
1153
"""See VersionedFiles.annotate."""
1154
return self._factory.annotate(self, key)
1156
def get_annotator(self):
1157
return _KnitAnnotator(self)
1159
def check(self, progress_bar=None, keys=None):
1160
"""See VersionedFiles.check()."""
1162
return self._logical_check()
1164
# At the moment, check does not extra work over get_record_stream
1165
return self.get_record_stream(keys, 'unordered', True)
1167
def _logical_check(self):
1168
# This doesn't actually test extraction of everything, but that will
1169
# impact 'bzr check' substantially, and needs to be integrated with
1170
# care. However, it does check for the obvious problem of a delta with
1172
keys = self._index.keys()
1173
parent_map = self.get_parent_map(keys)
1175
if self._index.get_method(key) != 'fulltext':
1176
compression_parent = parent_map[key][0]
1177
if compression_parent not in parent_map:
1178
raise KnitCorrupt(self,
1179
"Missing basis parent %s for %s" % (
1180
compression_parent, key))
1181
for fallback_vfs in self._immediate_fallback_vfs:
1182
fallback_vfs.check()
1184
def _check_add(self, key, lines, random_id, check_content):
1185
"""check that version_id and lines are safe to add."""
1186
if not all(isinstance(x, bytes) or x is None for x in key):
1187
raise TypeError(key)
1188
version_id = key[-1]
1189
if version_id is not None:
1190
if contains_whitespace(version_id):
1191
raise InvalidRevisionId(version_id, self)
1192
self.check_not_reserved_id(version_id)
1193
# TODO: If random_id==False and the key is already present, we should
1194
# probably check that the existing content is identical to what is
1195
# being inserted, and otherwise raise an exception. This would make
1196
# the bundle code simpler.
1198
self._check_lines_not_unicode(lines)
1199
self._check_lines_are_lines(lines)
1201
def _check_header(self, key, line):
1202
rec = self._split_header(line)
1203
self._check_header_version(rec, key[-1])
1206
def _check_header_version(self, rec, version_id):
1207
"""Checks the header version on original format knit records.
1209
These have the last component of the key embedded in the record.
1211
if rec[1] != version_id:
1212
raise KnitCorrupt(self,
1213
'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
1215
def _check_should_delta(self, parent):
1216
"""Iterate back through the parent listing, looking for a fulltext.
1218
This is used when we want to decide whether to add a delta or a new
1219
fulltext. It searches for _max_delta_chain parents. When it finds a
1220
fulltext parent, it sees if the total size of the deltas leading up to
1221
it is large enough to indicate that we want a new full text anyway.
1223
Return True if we should create a new delta, False if we should use a
1227
fulltext_size = None
1228
for count in range(self._max_delta_chain):
1230
# Note that this only looks in the index of this particular
1231
# KnitVersionedFiles, not in the fallbacks. This ensures that
1232
# we won't store a delta spanning physical repository
1234
build_details = self._index.get_build_details([parent])
1235
parent_details = build_details[parent]
1236
except (RevisionNotPresent, KeyError) as e:
1237
# Some basis is not locally present: always fulltext
1239
index_memo, compression_parent, _, _ = parent_details
1240
_, _, size = index_memo
1241
if compression_parent is None:
1242
fulltext_size = size
1245
# We don't explicitly check for presence because this is in an
1246
# inner loop, and if it's missing it'll fail anyhow.
1247
parent = compression_parent
1249
# We couldn't find a fulltext, so we must create a new one
1251
# Simple heuristic - if the total I/O wold be greater as a delta than
1252
# the originally installed fulltext, we create a new fulltext.
1253
return fulltext_size > delta_size
1255
def _build_details_to_components(self, build_details):
1256
"""Convert a build_details tuple to a position tuple."""
1257
# record_details, access_memo, compression_parent
1258
return build_details[3], build_details[0], build_details[1]
1260
def _get_components_positions(self, keys, allow_missing=False):
1261
"""Produce a map of position data for the components of keys.
1263
This data is intended to be used for retrieving the knit records.
1265
A dict of key to (record_details, index_memo, next, parents) is
1268
* method is the way referenced data should be applied.
1269
* index_memo is the handle to pass to the data access to actually get
1271
* next is the build-parent of the version, or None for fulltexts.
1272
* parents is the version_ids of the parents of this version
1274
:param allow_missing: If True do not raise an error on a missing
1275
component, just ignore it.
1278
pending_components = keys
1279
while pending_components:
1280
build_details = self._index.get_build_details(pending_components)
1281
current_components = set(pending_components)
1282
pending_components = set()
1283
for key, details in build_details.items():
1284
(index_memo, compression_parent, parents,
1285
record_details) = details
1286
if compression_parent is not None:
1287
pending_components.add(compression_parent)
1288
component_data[key] = self._build_details_to_components(
1290
missing = current_components.difference(build_details)
1291
if missing and not allow_missing:
1292
raise errors.RevisionNotPresent(missing.pop(), self)
1293
return component_data
1295
def _get_content(self, key, parent_texts={}):
1296
"""Returns a content object that makes up the specified
1298
cached_version = parent_texts.get(key, None)
1299
if cached_version is not None:
1300
# Ensure the cache dict is valid.
1301
if not self.get_parent_map([key]):
1302
raise RevisionNotPresent(key, self)
1303
return cached_version
1304
generator = _VFContentMapGenerator(self, [key])
1305
return generator._get_content(key)
1307
def get_parent_map(self, keys):
1308
"""Get a map of the graph parents of keys.
1310
:param keys: The keys to look up parents for.
1311
:return: A mapping from keys to parents. Absent keys are absent from
1314
return self._get_parent_map_with_sources(keys)[0]
1316
def _get_parent_map_with_sources(self, keys):
1317
"""Get a map of the parents of keys.
1319
:param keys: The keys to look up parents for.
1320
:return: A tuple. The first element is a mapping from keys to parents.
1321
Absent keys are absent from the mapping. The second element is a
1322
list with the locations each key was found in. The first element
1323
is the in-this-knit parents, the second the first fallback source,
1327
sources = [self._index] + self._immediate_fallback_vfs
1330
for source in sources:
1333
new_result = source.get_parent_map(missing)
1334
source_results.append(new_result)
1335
result.update(new_result)
1336
missing.difference_update(set(new_result))
1337
return result, source_results
1339
def _get_record_map(self, keys, allow_missing=False):
1340
"""Produce a dictionary of knit records.
1342
:return: {key:(record, record_details, digest, next)}
1344
* record: data returned from read_records (a KnitContentobject)
1345
* record_details: opaque information to pass to parse_record
1346
* digest: SHA1 digest of the full text after all steps are done
1347
* next: build-parent of the version, i.e. the leftmost ancestor.
1348
Will be None if the record is not a delta.
1350
:param keys: The keys to build a map for
1351
:param allow_missing: If some records are missing, rather than
1352
error, just return the data that could be generated.
1354
raw_map = self._get_record_map_unparsed(keys,
1355
allow_missing=allow_missing)
1356
return self._raw_map_to_record_map(raw_map)
1358
def _raw_map_to_record_map(self, raw_map):
1359
"""Parse the contents of _get_record_map_unparsed.
1361
:return: see _get_record_map.
1365
data, record_details, next = raw_map[key]
1366
content, digest = self._parse_record(key[-1], data)
1367
result[key] = content, record_details, digest, next
1370
def _get_record_map_unparsed(self, keys, allow_missing=False):
1371
"""Get the raw data for reconstructing keys without parsing it.
1373
:return: A dict suitable for parsing via _raw_map_to_record_map.
1374
key-> raw_bytes, (method, noeol), compression_parent
1376
# This retries the whole request if anything fails. Potentially we
1377
# could be a bit more selective. We could track the keys whose records
1378
# we have successfully found, and then only request the new records
1379
# from there. However, _get_components_positions grabs the whole build
1380
# chain, which means we'll likely try to grab the same records again
1381
# anyway. Also, can the build chains change as part of a pack
1382
# operation? We wouldn't want to end up with a broken chain.
1385
position_map = self._get_components_positions(keys,
1386
allow_missing=allow_missing)
1387
# key = component_id, r = record_details, i_m = index_memo,
1389
records = [(key, i_m) for key, (r, i_m, n)
1390
in position_map.items()]
1391
# Sort by the index memo, so that we request records from the
1392
# same pack file together, and in forward-sorted order
1393
records.sort(key=operator.itemgetter(1))
1395
for key, data in self._read_records_iter_unchecked(records):
1396
(record_details, index_memo, next) = position_map[key]
1397
raw_record_map[key] = data, record_details, next
1398
return raw_record_map
1399
except errors.RetryWithNewPacks as e:
1400
self._access.reload_or_raise(e)
1403
def _split_by_prefix(cls, keys):
1404
"""For the given keys, split them up based on their prefix.
1406
To keep memory pressure somewhat under control, split the
1407
requests back into per-file-id requests, otherwise "bzr co"
1408
extracts the full tree into memory before writing it to disk.
1409
This should be revisited if _get_content_maps() can ever cross
1412
The keys for a given file_id are kept in the same relative order.
1413
Ordering between file_ids is not, though prefix_order will return the
1414
order that the key was first seen.
1416
:param keys: An iterable of key tuples
1417
:return: (split_map, prefix_order)
1418
split_map A dictionary mapping prefix => keys
1419
prefix_order The order that we saw the various prefixes
1421
split_by_prefix = {}
1429
if prefix in split_by_prefix:
1430
split_by_prefix[prefix].append(key)
1432
split_by_prefix[prefix] = [key]
1433
prefix_order.append(prefix)
1434
return split_by_prefix, prefix_order
1436
def _group_keys_for_io(self, keys, non_local_keys, positions,
1437
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1438
"""For the given keys, group them into 'best-sized' requests.
1440
The idea is to avoid making 1 request per file, but to never try to
1441
unpack an entire 1.5GB source tree in a single pass. Also when
1442
possible, we should try to group requests to the same pack file
1445
:return: list of (keys, non_local) tuples that indicate what keys
1446
should be fetched next.
1448
# TODO: Ideally we would group on 2 factors. We want to extract texts
1449
# from the same pack file together, and we want to extract all
1450
# the texts for a given build-chain together. Ultimately it
1451
# probably needs a better global view.
1452
total_keys = len(keys)
1453
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1454
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1456
cur_non_local = set()
1460
for prefix in prefix_order:
1461
keys = prefix_split_keys[prefix]
1462
non_local = prefix_split_non_local_keys.get(prefix, [])
1464
this_size = self._index._get_total_build_size(keys, positions)
1465
cur_size += this_size
1466
cur_keys.extend(keys)
1467
cur_non_local.update(non_local)
1468
if cur_size > _min_buffer_size:
1469
result.append((cur_keys, cur_non_local))
1470
sizes.append(cur_size)
1472
cur_non_local = set()
1475
result.append((cur_keys, cur_non_local))
1476
sizes.append(cur_size)
1479
def get_record_stream(self, keys, ordering, include_delta_closure):
1480
"""Get a stream of records for keys.
1482
:param keys: The keys to include.
1483
:param ordering: Either 'unordered' or 'topological'. A topologically
1484
sorted stream has compression parents strictly before their
1486
:param include_delta_closure: If True then the closure across any
1487
compression parents will be included (in the opaque data).
1488
:return: An iterator of ContentFactory objects, each of which is only
1489
valid until the iterator is advanced.
1491
# keys might be a generator
1495
if not self._index.has_graph:
1496
# Cannot sort when no graph has been stored.
1497
ordering = 'unordered'
1499
remaining_keys = keys
1502
keys = set(remaining_keys)
1503
for content_factory in self._get_remaining_record_stream(keys,
1504
ordering, include_delta_closure):
1505
remaining_keys.discard(content_factory.key)
1506
yield content_factory
1508
except errors.RetryWithNewPacks as e:
1509
self._access.reload_or_raise(e)
1511
def _get_remaining_record_stream(self, keys, ordering,
1512
include_delta_closure):
1513
"""This function is the 'retry' portion for get_record_stream."""
1514
if include_delta_closure:
1515
positions = self._get_components_positions(
1516
keys, allow_missing=True)
1518
build_details = self._index.get_build_details(keys)
1520
# (record_details, access_memo, compression_parent_key)
1521
positions = dict((key, self._build_details_to_components(details))
1522
for key, details in build_details.items())
1523
absent_keys = keys.difference(set(positions))
1524
# There may be more absent keys : if we're missing the basis component
1525
# and are trying to include the delta closure.
1526
# XXX: We should not ever need to examine remote sources because we do
1527
# not permit deltas across versioned files boundaries.
1528
if include_delta_closure:
1529
needed_from_fallback = set()
1530
# Build up reconstructable_keys dict. key:True in this dict means
1531
# the key can be reconstructed.
1532
reconstructable_keys = {}
1536
chain = [key, positions[key][2]]
1538
needed_from_fallback.add(key)
1541
while chain[-1] is not None:
1542
if chain[-1] in reconstructable_keys:
1543
result = reconstructable_keys[chain[-1]]
1547
chain.append(positions[chain[-1]][2])
1549
# missing basis component
1550
needed_from_fallback.add(chain[-1])
1553
for chain_key in chain[:-1]:
1554
reconstructable_keys[chain_key] = result
1556
needed_from_fallback.add(key)
1557
# Double index lookups here : need a unified api ?
1558
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1559
if ordering in ('topological', 'groupcompress'):
1560
if ordering == 'topological':
1561
# Global topological sort
1562
present_keys = tsort.topo_sort(global_map)
1564
present_keys = sort_groupcompress(global_map)
1565
# Now group by source:
1567
current_source = None
1568
for key in present_keys:
1569
for parent_map in parent_maps:
1570
if key in parent_map:
1571
key_source = parent_map
1573
if current_source is not key_source:
1574
source_keys.append((key_source, []))
1575
current_source = key_source
1576
source_keys[-1][1].append(key)
1578
if ordering != 'unordered':
1579
raise AssertionError('valid values for ordering are:'
1580
' "unordered", "groupcompress" or "topological" not: %r'
1582
# Just group by source; remote sources first.
1585
for parent_map in reversed(parent_maps):
1586
source_keys.append((parent_map, []))
1587
for key in parent_map:
1588
present_keys.append(key)
1589
source_keys[-1][1].append(key)
1590
# We have been requested to return these records in an order that
1591
# suits us. So we ask the index to give us an optimally sorted
1593
for source, sub_keys in source_keys:
1594
if source is parent_maps[0]:
1595
# Only sort the keys for this VF
1596
self._index._sort_keys_by_io(sub_keys, positions)
1597
absent_keys = keys - set(global_map)
1598
for key in absent_keys:
1599
yield AbsentContentFactory(key)
1600
# restrict our view to the keys we can answer.
1601
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1602
# XXX: At that point we need to consider the impact of double reads by
1603
# utilising components multiple times.
1604
if include_delta_closure:
1605
# XXX: get_content_maps performs its own index queries; allow state
1607
non_local_keys = needed_from_fallback - absent_keys
1608
for keys, non_local_keys in self._group_keys_for_io(present_keys,
1611
generator = _VFContentMapGenerator(self, keys, non_local_keys,
1614
for record in generator.get_record_stream():
1617
for source, keys in source_keys:
1618
if source is parent_maps[0]:
1619
# this KnitVersionedFiles
1620
records = [(key, positions[key][1]) for key in keys]
1621
for key, raw_data in self._read_records_iter_unchecked(records):
1622
(record_details, index_memo, _) = positions[key]
1623
yield KnitContentFactory(key, global_map[key],
1624
record_details, None, raw_data, self._factory.annotated, None)
1626
vf = self._immediate_fallback_vfs[parent_maps.index(
1628
for record in vf.get_record_stream(keys, ordering,
1629
include_delta_closure):
1632
def get_sha1s(self, keys):
1633
"""See VersionedFiles.get_sha1s()."""
1635
record_map = self._get_record_map(missing, allow_missing=True)
1637
for key, details in record_map.items():
1638
if key not in missing:
1640
# record entry 2 is the 'digest'.
1641
result[key] = details[2]
1642
missing.difference_update(set(result))
1643
for source in self._immediate_fallback_vfs:
1646
new_result = source.get_sha1s(missing)
1647
result.update(new_result)
1648
missing.difference_update(set(new_result))
1651
def insert_record_stream(self, stream):
1652
"""Insert a record stream into this container.
1654
:param stream: A stream of records to insert.
1656
:seealso VersionedFiles.get_record_stream:
1658
def get_adapter(adapter_key):
1660
return adapters[adapter_key]
1662
adapter_factory = adapter_registry.get(adapter_key)
1663
adapter = adapter_factory(self)
1664
adapters[adapter_key] = adapter
1667
if self._factory.annotated:
1668
# self is annotated, we need annotated knits to use directly.
1669
annotated = "annotated-"
1672
# self is not annotated, but we can strip annotations cheaply.
1674
convertibles = {"knit-annotated-ft-gz"}
1675
if self._max_delta_chain:
1676
delta_types.add("knit-annotated-delta-gz")
1677
convertibles.add("knit-annotated-delta-gz")
1678
# The set of types we can cheaply adapt without needing basis texts.
1679
native_types = set()
1680
if self._max_delta_chain:
1681
native_types.add("knit-%sdelta-gz" % annotated)
1682
delta_types.add("knit-%sdelta-gz" % annotated)
1683
native_types.add("knit-%sft-gz" % annotated)
1684
knit_types = native_types.union(convertibles)
1686
# Buffer all index entries that we can't add immediately because their
1687
# basis parent is missing. We don't buffer all because generating
1688
# annotations may require access to some of the new records. However we
1689
# can't generate annotations from new deltas until their basis parent
1690
# is present anyway, so we get away with not needing an index that
1691
# includes the new keys.
1693
# See <http://launchpad.net/bugs/300177> about ordering of compression
1694
# parents in the records - to be conservative, we insist that all
1695
# parents must be present to avoid expanding to a fulltext.
1697
# key = basis_parent, value = index entry to add
1698
buffered_index_entries = {}
1699
for record in stream:
1700
kind = record.storage_kind
1701
if kind.startswith('knit-') and kind.endswith('-gz'):
1702
# Check that the ID in the header of the raw knit bytes matches
1703
# the record metadata.
1704
raw_data = record._raw_record
1705
df, rec = self._parse_record_header(record.key, raw_data)
1708
parents = record.parents
1709
if record.storage_kind in delta_types:
1710
# TODO: eventually the record itself should track
1711
# compression_parent
1712
compression_parent = parents[0]
1714
compression_parent = None
1715
# Raise an error when a record is missing.
1716
if record.storage_kind == 'absent':
1717
raise RevisionNotPresent([record.key], self)
1718
elif ((record.storage_kind in knit_types) and
1719
(compression_parent is None or
1720
not self._immediate_fallback_vfs or
1721
compression_parent in self._index or
1722
compression_parent not in self)):
1723
# we can insert the knit record literally if either it has no
1724
# compression parent OR we already have its basis in this kvf
1725
# OR the basis is not present even in the fallbacks. In the
1726
# last case it will either turn up later in the stream and all
1727
# will be well, or it won't turn up at all and we'll raise an
1730
# TODO: self.__contains__ is somewhat redundant with
1731
# self._index.__contains__; we really want something that directly
1732
# asks if it's only present in the fallbacks. -- mbp 20081119
1733
if record.storage_kind not in native_types:
1735
adapter_key = (record.storage_kind, "knit-delta-gz")
1736
adapter = get_adapter(adapter_key)
1738
adapter_key = (record.storage_kind, "knit-ft-gz")
1739
adapter = get_adapter(adapter_key)
1740
bytes = adapter.get_bytes(record, adapter_key[1])
1742
# It's a knit record, it has a _raw_record field (even if
1743
# it was reconstituted from a network stream).
1744
bytes = record._raw_record
1745
options = [record._build_details[0].encode('ascii')]
1746
if record._build_details[1]:
1747
options.append(b'no-eol')
1748
# Just blat it across.
1749
# Note: This does end up adding data on duplicate keys. As
1750
# modern repositories use atomic insertions this should not
1751
# lead to excessive growth in the event of interrupted fetches.
1752
# 'knit' repositories may suffer excessive growth, but as a
1753
# deprecated format this is tolerable. It can be fixed if
1754
# needed by in the kndx index support raising on a duplicate
1755
# add with identical parents and options.
1756
access_memo = self._access.add_raw_record(
1757
record.key, len(bytes), [bytes])
1758
index_entry = (record.key, options, access_memo, parents)
1759
if b'fulltext' not in options:
1760
# Not a fulltext, so we need to make sure the compression
1761
# parent will also be present.
1762
# Note that pack backed knits don't need to buffer here
1763
# because they buffer all writes to the transaction level,
1764
# but we don't expose that difference at the index level. If
1765
# the query here has sufficient cost to show up in
1766
# profiling we should do that.
1768
# They're required to be physically in this
1769
# KnitVersionedFiles, not in a fallback.
1770
if compression_parent not in self._index:
1771
pending = buffered_index_entries.setdefault(
1772
compression_parent, [])
1773
pending.append(index_entry)
1776
self._index.add_records([index_entry])
1777
elif record.storage_kind in ('chunked', 'file'):
1778
self.add_lines(record.key, parents, record.get_bytes_as('lines'))
1780
# Not suitable for direct insertion as a
1781
# delta, either because it's not the right format, or this
1782
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1783
# 0) or because it depends on a base only present in the
1785
self._access.flush()
1787
# Try getting a fulltext directly from the record.
1788
lines = record.get_bytes_as('lines')
1789
except errors.UnavailableRepresentation:
1790
adapter_key = record.storage_kind, 'lines'
1791
adapter = get_adapter(adapter_key)
1792
lines = adapter.get_bytes(record, 'lines')
1794
self.add_lines(record.key, parents, lines)
1795
except errors.RevisionAlreadyPresent:
1797
# Add any records whose basis parent is now available.
1799
added_keys = [record.key]
1801
key = added_keys.pop(0)
1802
if key in buffered_index_entries:
1803
index_entries = buffered_index_entries[key]
1804
self._index.add_records(index_entries)
1806
[index_entry[0] for index_entry in index_entries])
1807
del buffered_index_entries[key]
1808
if buffered_index_entries:
1809
# There were index entries buffered at the end of the stream,
1810
# So these need to be added (if the index supports holding such
1811
# entries for later insertion)
1813
for key in buffered_index_entries:
1814
index_entries = buffered_index_entries[key]
1815
all_entries.extend(index_entries)
1816
self._index.add_records(
1817
all_entries, missing_compression_parents=True)
1819
def get_missing_compression_parent_keys(self):
1820
"""Return an iterable of keys of missing compression parents.
1822
Check this after calling insert_record_stream to find out if there are
1823
any missing compression parents. If there are, the records that
1824
depend on them are not able to be inserted safely. For atomic
1825
KnitVersionedFiles built on packs, the transaction should be aborted or
1826
suspended - commit will fail at this point. Nonatomic knits will error
1827
earlier because they have no staging area to put pending entries into.
1829
return self._index.get_missing_compression_parents()
1831
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1832
"""Iterate over the lines in the versioned files from keys.
1834
This may return lines from other keys. Each item the returned
1835
iterator yields is a tuple of a line and a text version that that line
1836
is present in (not introduced in).
1838
Ordering of results is in whatever order is most suitable for the
1839
underlying storage format.
1841
If a progress bar is supplied, it may be used to indicate progress.
1842
The caller is responsible for cleaning up progress bars (because this
1846
* Lines are normalised by the underlying store: they will all have \\n
1848
* Lines are returned in arbitrary order.
1849
* If a requested key did not change any lines (or didn't have any
1850
lines), it may not be mentioned at all in the result.
1852
:param pb: Progress bar supplied by caller.
1853
:return: An iterator over (line, key).
1856
pb = ui.ui_factory.nested_progress_bar()
1862
# we don't care about inclusions, the caller cares.
1863
# but we need to setup a list of records to visit.
1864
# we need key, position, length
1866
build_details = self._index.get_build_details(keys)
1867
for key, details in build_details.items():
1869
key_records.append((key, details[0]))
1870
records_iter = enumerate(self._read_records_iter(key_records))
1871
for (key_idx, (key, data, sha_value)) in records_iter:
1872
pb.update(gettext('Walking content'), key_idx, total)
1873
compression_parent = build_details[key][1]
1874
if compression_parent is None:
1876
line_iterator = self._factory.get_fulltext_content(
1880
line_iterator = self._factory.get_linedelta_content(
1882
# Now that we are yielding the data for this key, remove it
1885
# XXX: It might be more efficient to yield (key,
1886
# line_iterator) in the future. However for now, this is a
1887
# simpler change to integrate into the rest of the
1888
# codebase. RBC 20071110
1889
for line in line_iterator:
1892
except errors.RetryWithNewPacks as e:
1893
self._access.reload_or_raise(e)
1894
# If there are still keys we've not yet found, we look in the fallback
1895
# vfs, and hope to find them there. Note that if the keys are found
1896
# but had no changes or no content, the fallback may not return
1898
if keys and not self._immediate_fallback_vfs:
1899
# XXX: strictly the second parameter is meant to be the file id
1900
# but it's not easily accessible here.
1901
raise RevisionNotPresent(keys, repr(self))
1902
for source in self._immediate_fallback_vfs:
1906
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1907
source_keys.add(key)
1909
keys.difference_update(source_keys)
1910
pb.update(gettext('Walking content'), total, total)
1912
def _make_line_delta(self, delta_seq, new_content):
1913
"""Generate a line delta from delta_seq and new_content."""
1915
for op in delta_seq.get_opcodes():
1916
if op[0] == 'equal':
1919
(op[1], op[2], op[4] - op[3], new_content._lines[op[3]:op[4]]))
1922
def _merge_annotations(self, content, parents, parent_texts={},
1923
delta=None, annotated=None,
1924
left_matching_blocks=None):
1925
"""Merge annotations for content and generate deltas.
1927
This is done by comparing the annotations based on changes to the text
1928
and generating a delta on the resulting full texts. If annotations are
1929
not being created then a simple delta is created.
1931
if left_matching_blocks is not None:
1932
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1936
for parent_key in parents:
1937
merge_content = self._get_content(parent_key, parent_texts)
1938
if (parent_key == parents[0] and delta_seq is not None):
1941
seq = patiencediff.PatienceSequenceMatcher(
1942
None, merge_content.text(), content.text())
1943
for i, j, n in seq.get_matching_blocks():
1946
# this copies (origin, text) pairs across to the new
1947
# content for any line that matches the last-checked
1949
content._lines[j:j + n] = merge_content._lines[i:i + n]
1950
# XXX: Robert says the following block is a workaround for a
1951
# now-fixed bug and it can probably be deleted. -- mbp 20080618
1952
if content._lines and not content._lines[-1][1].endswith(b'\n'):
1953
# The copied annotation was from a line without a trailing EOL,
1954
# reinstate one for the content object, to ensure correct
1956
line = content._lines[-1][1] + b'\n'
1957
content._lines[-1] = (content._lines[-1][0], line)
1959
if delta_seq is None:
1960
reference_content = self._get_content(parents[0], parent_texts)
1961
new_texts = content.text()
1962
old_texts = reference_content.text()
1963
delta_seq = patiencediff.PatienceSequenceMatcher(
1964
None, old_texts, new_texts)
1965
return self._make_line_delta(delta_seq, content)
1967
def _parse_record(self, version_id, data):
1968
"""Parse an original format knit record.
1970
These have the last element of the key only present in the stored data.
1972
rec, record_contents = self._parse_record_unchecked(data)
1973
self._check_header_version(rec, version_id)
1974
return record_contents, rec[3]
1976
def _parse_record_header(self, key, raw_data):
1977
"""Parse a record header for consistency.
1979
:return: the header and the decompressor stream.
1980
as (stream, header_record)
1982
df = gzip.GzipFile(mode='rb', fileobj=BytesIO(raw_data))
1985
rec = self._check_header(key, df.readline())
1986
except Exception as e:
1987
raise KnitCorrupt(self,
1988
"While reading {%s} got %s(%s)"
1989
% (key, e.__class__.__name__, str(e)))
1992
def _parse_record_unchecked(self, data):
1994
# 4168 calls in 2880 217 internal
1995
# 4168 calls to _parse_record_header in 2121
1996
# 4168 calls to readlines in 330
1997
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
1999
record_contents = df.readlines()
2000
except Exception as e:
2001
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
2002
(data, e.__class__.__name__, str(e)))
2003
header = record_contents.pop(0)
2004
rec = self._split_header(header)
2005
last_line = record_contents.pop()
2006
if len(record_contents) != int(rec[2]):
2007
raise KnitCorrupt(self,
2008
'incorrect number of lines %s != %s'
2009
' for version {%s} %s'
2010
% (len(record_contents), int(rec[2]),
2011
rec[1], record_contents))
2012
if last_line != b'end %s\n' % rec[1]:
2013
raise KnitCorrupt(self,
2014
'unexpected version end line %r, wanted %r'
2015
% (last_line, rec[1]))
2016
return rec, record_contents
2018
def _read_records_iter(self, records):
2019
"""Read text records from data file and yield result.
2021
The result will be returned in whatever is the fastest to read.
2022
Not by the order requested. Also, multiple requests for the same
2023
record will only yield 1 response.
2025
:param records: A list of (key, access_memo) entries
2026
:return: Yields (key, contents, digest) in the order
2027
read, not the order requested
2032
# XXX: This smells wrong, IO may not be getting ordered right.
2033
needed_records = sorted(set(records), key=operator.itemgetter(1))
2034
if not needed_records:
2037
# The transport optimizes the fetching as well
2038
# (ie, reads continuous ranges.)
2039
raw_data = self._access.get_raw_records(
2040
[index_memo for key, index_memo in needed_records])
2042
for (key, index_memo), data in zip(needed_records, raw_data):
2043
content, digest = self._parse_record(key[-1], data)
2044
yield key, content, digest
2046
def _read_records_iter_raw(self, records):
2047
"""Read text records from data file and yield raw data.
2049
This unpacks enough of the text record to validate the id is
2050
as expected but thats all.
2052
Each item the iterator yields is (key, bytes,
2053
expected_sha1_of_full_text).
2055
for key, data in self._read_records_iter_unchecked(records):
2056
# validate the header (note that we can only use the suffix in
2057
# current knit records).
2058
df, rec = self._parse_record_header(key, data)
2060
yield key, data, rec[3]
2062
def _read_records_iter_unchecked(self, records):
2063
"""Read text records from data file and yield raw data.
2065
No validation is done.
2067
Yields tuples of (key, data).
2069
# setup an iterator of the external records:
2070
# uses readv so nice and fast we hope.
2072
# grab the disk data needed.
2073
needed_offsets = [index_memo for key, index_memo
2075
raw_records = self._access.get_raw_records(needed_offsets)
2077
for key, index_memo in records:
2078
data = next(raw_records)
2081
def _record_to_data(self, key, digest, lines, dense_lines=None):
2082
"""Convert key, digest, lines into a raw data block.
2084
:param key: The key of the record. Currently keys are always serialised
2085
using just the trailing component.
2086
:param dense_lines: The bytes of lines but in a denser form. For
2087
instance, if lines is a list of 1000 bytestrings each ending in
2088
\\n, dense_lines may be a list with one line in it, containing all
2089
the 1000's lines and their \\n's. Using dense_lines if it is
2090
already known is a win because the string join to create bytes in
2091
this function spends less time resizing the final string.
2092
:return: (len, chunked bytestring with compressed data)
2094
chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
2095
chunks.extend(dense_lines or lines)
2096
chunks.append(b"end " + key[-1] + b"\n")
2097
for chunk in chunks:
2098
if not isinstance(chunk, bytes):
2099
raise AssertionError(
2100
'data must be plain bytes was %s' % type(chunk))
2101
if lines and not lines[-1].endswith(b'\n'):
2102
raise ValueError('corrupt lines value %r' % lines)
2103
compressed_chunks = tuned_gzip.chunks_to_gzip(chunks)
2104
return sum(map(len, compressed_chunks)), compressed_chunks
2106
def _split_header(self, line):
2109
raise KnitCorrupt(self,
2110
'unexpected number of elements in record header')
2114
"""See VersionedFiles.keys."""
2115
if 'evil' in debug.debug_flags:
2116
trace.mutter_callsite(2, "keys scales with size of history")
2117
sources = [self._index] + self._immediate_fallback_vfs
2119
for source in sources:
2120
result.update(source.keys())
2124
class _ContentMapGenerator(object):
2125
"""Generate texts or expose raw deltas for a set of texts."""
2127
def __init__(self, ordering='unordered'):
2128
self._ordering = ordering
2130
def _get_content(self, key):
2131
"""Get the content object for key."""
2132
# Note that _get_content is only called when the _ContentMapGenerator
2133
# has been constructed with just one key requested for reconstruction.
2134
if key in self.nonlocal_keys:
2135
record = next(self.get_record_stream())
2136
# Create a content object on the fly
2137
lines = record.get_bytes_as('lines')
2138
return PlainKnitContent(lines, record.key)
2140
# local keys we can ask for directly
2141
return self._get_one_work(key)
2143
def get_record_stream(self):
2144
"""Get a record stream for the keys requested during __init__."""
2145
for record in self._work():
2149
"""Produce maps of text and KnitContents as dicts.
2151
:return: (text_map, content_map) where text_map contains the texts for
2152
the requested versions and content_map contains the KnitContents.
2154
# NB: By definition we never need to read remote sources unless texts
2155
# are requested from them: we don't delta across stores - and we
2156
# explicitly do not want to to prevent data loss situations.
2157
if self.global_map is None:
2158
self.global_map = self.vf.get_parent_map(self.keys)
2159
nonlocal_keys = self.nonlocal_keys
2161
missing_keys = set(nonlocal_keys)
2162
# Read from remote versioned file instances and provide to our caller.
2163
for source in self.vf._immediate_fallback_vfs:
2164
if not missing_keys:
2166
# Loop over fallback repositories asking them for texts - ignore
2167
# any missing from a particular fallback.
2168
for record in source.get_record_stream(missing_keys,
2169
self._ordering, True):
2170
if record.storage_kind == 'absent':
2171
# Not in thie particular stream, may be in one of the
2172
# other fallback vfs objects.
2174
missing_keys.remove(record.key)
2177
if self._raw_record_map is None:
2178
raise AssertionError('_raw_record_map should have been filled')
2180
for key in self.keys:
2181
if key in self.nonlocal_keys:
2183
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2186
def _get_one_work(self, requested_key):
2187
# Now, if we have calculated everything already, just return the
2189
if requested_key in self._contents_map:
2190
return self._contents_map[requested_key]
2191
# To simplify things, parse everything at once - code that wants one text
2192
# probably wants them all.
2193
# FUTURE: This function could be improved for the 'extract many' case
2194
# by tracking each component and only doing the copy when the number of
2195
# children than need to apply delta's to it is > 1 or it is part of the
2197
multiple_versions = len(self.keys) != 1
2198
if self._record_map is None:
2199
self._record_map = self.vf._raw_map_to_record_map(
2200
self._raw_record_map)
2201
record_map = self._record_map
2202
# raw_record_map is key:
2203
# Have read and parsed records at this point.
2204
for key in self.keys:
2205
if key in self.nonlocal_keys:
2210
while cursor is not None:
2212
record, record_details, digest, next = record_map[cursor]
2214
raise RevisionNotPresent(cursor, self)
2215
components.append((cursor, record, record_details, digest))
2217
if cursor in self._contents_map:
2218
# no need to plan further back
2219
components.append((cursor, None, None, None))
2223
for (component_id, record, record_details,
2224
digest) in reversed(components):
2225
if component_id in self._contents_map:
2226
content = self._contents_map[component_id]
2228
content, delta = self._factory.parse_record(
2229
key[-1], record, record_details, content,
2230
copy_base_content=multiple_versions)
2231
if multiple_versions:
2232
self._contents_map[component_id] = content
2234
# digest here is the digest from the last applied component.
2235
text = content.text()
2236
actual_sha = sha_strings(text)
2237
if actual_sha != digest:
2238
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2239
if multiple_versions:
2240
return self._contents_map[requested_key]
2244
def _wire_bytes(self):
2245
"""Get the bytes to put on the wire for 'key'.
2247
The first collection of bytes asked for returns the serialised
2248
raw_record_map and the additional details (key, parent) for key.
2249
Subsequent calls return just the additional details (key, parent).
2250
The wire storage_kind given for the first key is 'knit-delta-closure',
2251
For subsequent keys it is 'knit-delta-closure-ref'.
2253
:param key: A key from the content generator.
2254
:return: Bytes to put on the wire.
2257
# kind marker for dispatch on the far side,
2258
lines.append(b'knit-delta-closure')
2260
if self.vf._factory.annotated:
2261
lines.append(b'annotated')
2264
# then the list of keys
2265
lines.append(b'\t'.join(b'\x00'.join(key) for key in self.keys
2266
if key not in self.nonlocal_keys))
2267
# then the _raw_record_map in serialised form:
2269
# for each item in the map:
2271
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2272
# one line with method
2273
# one line with noeol
2274
# one line with next ('' for None)
2275
# one line with byte count of the record bytes
2277
for key, (record_bytes, (method, noeol), next) in (
2278
self._raw_record_map.items()):
2279
key_bytes = b'\x00'.join(key)
2280
parents = self.global_map.get(key, None)
2282
parent_bytes = b'None:'
2284
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2285
method_bytes = method.encode('ascii')
2291
next_bytes = b'\x00'.join(next)
2294
map_byte_list.append(b'\n'.join(
2295
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2296
b'%d' % len(record_bytes), record_bytes]))
2297
map_bytes = b''.join(map_byte_list)
2298
lines.append(map_bytes)
2299
bytes = b'\n'.join(lines)
2303
class _VFContentMapGenerator(_ContentMapGenerator):
2304
"""Content map generator reading from a VersionedFiles object."""
2306
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2307
global_map=None, raw_record_map=None, ordering='unordered'):
2308
"""Create a _ContentMapGenerator.
2310
:param versioned_files: The versioned files that the texts are being
2312
:param keys: The keys to produce content maps for.
2313
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2314
which are known to not be in this knit, but rather in one of the
2316
:param global_map: The result of get_parent_map(keys) (or a supermap).
2317
This is required if get_record_stream() is to be used.
2318
:param raw_record_map: A unparsed raw record map to use for answering
2321
_ContentMapGenerator.__init__(self, ordering=ordering)
2322
# The vf to source data from
2323
self.vf = versioned_files
2325
self.keys = list(keys)
2326
# Keys known to be in fallback vfs objects
2327
if nonlocal_keys is None:
2328
self.nonlocal_keys = set()
2330
self.nonlocal_keys = frozenset(nonlocal_keys)
2331
# Parents data for keys to be returned in get_record_stream
2332
self.global_map = global_map
2333
# The chunked lists for self.keys in text form
2335
# A cache of KnitContent objects used in extracting texts.
2336
self._contents_map = {}
2337
# All the knit records needed to assemble the requested keys as full
2339
self._record_map = None
2340
if raw_record_map is None:
2341
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2344
self._raw_record_map = raw_record_map
2345
# the factory for parsing records
2346
self._factory = self.vf._factory
2349
class _NetworkContentMapGenerator(_ContentMapGenerator):
2350
"""Content map generator sourced from a network stream."""
2352
def __init__(self, bytes, line_end):
2353
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2355
self.global_map = {}
2356
self._raw_record_map = {}
2357
self._contents_map = {}
2358
self._record_map = None
2359
self.nonlocal_keys = []
2360
# Get access to record parsing facilities
2361
self.vf = KnitVersionedFiles(None, None)
2364
line_end = bytes.find(b'\n', start)
2365
line = bytes[start:line_end]
2366
start = line_end + 1
2367
if line == b'annotated':
2368
self._factory = KnitAnnotateFactory()
2370
self._factory = KnitPlainFactory()
2371
# list of keys to emit in get_record_stream
2372
line_end = bytes.find(b'\n', start)
2373
line = bytes[start:line_end]
2374
start = line_end + 1
2376
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2378
# now a loop until the end. XXX: It would be nice if this was just a
2379
# bunch of the same records as get_record_stream(..., False) gives, but
2380
# there is a decent sized gap stopping that at the moment.
2384
line_end = bytes.find(b'\n', start)
2385
key = tuple(bytes[start:line_end].split(b'\x00'))
2386
start = line_end + 1
2387
# 1 line with parents (None: for None, '' for ())
2388
line_end = bytes.find(b'\n', start)
2389
line = bytes[start:line_end]
2390
if line == b'None:':
2394
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2396
self.global_map[key] = parents
2397
start = line_end + 1
2398
# one line with method
2399
line_end = bytes.find(b'\n', start)
2400
line = bytes[start:line_end]
2401
method = line.decode('ascii')
2402
start = line_end + 1
2403
# one line with noeol
2404
line_end = bytes.find(b'\n', start)
2405
line = bytes[start:line_end]
2406
noeol = line == b"T"
2407
start = line_end + 1
2408
# one line with next (b'' for None)
2409
line_end = bytes.find(b'\n', start)
2410
line = bytes[start:line_end]
2414
next = tuple(bytes[start:line_end].split(b'\x00'))
2415
start = line_end + 1
2416
# one line with byte count of the record bytes
2417
line_end = bytes.find(b'\n', start)
2418
line = bytes[start:line_end]
2420
start = line_end + 1
2422
record_bytes = bytes[start:start + count]
2423
start = start + count
2425
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2427
def get_record_stream(self):
2428
"""Get a record stream for for keys requested by the bytestream."""
2430
for key in self.keys:
2431
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2434
def _wire_bytes(self):
2438
class _KndxIndex(object):
2439
"""Manages knit index files
2441
The index is kept in memory and read on startup, to enable
2442
fast lookups of revision information. The cursor of the index
2443
file is always pointing to the end, making it easy to append
2446
_cache is a cache for fast mapping from version id to a Index
2449
_history is a cache for fast mapping from indexes to version ids.
2451
The index data format is dictionary compressed when it comes to
2452
parent references; a index entry may only have parents that with a
2453
lover index number. As a result, the index is topological sorted.
2455
Duplicate entries may be written to the index for a single version id
2456
if this is done then the latter one completely replaces the former:
2457
this allows updates to correct version and parent information.
2458
Note that the two entries may share the delta, and that successive
2459
annotations and references MUST point to the first entry.
2461
The index file on disc contains a header, followed by one line per knit
2462
record. The same revision can be present in an index file more than once.
2463
The first occurrence gets assigned a sequence number starting from 0.
2465
The format of a single line is
2466
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2467
REVISION_ID is a utf8-encoded revision id
2468
FLAGS is a comma separated list of flags about the record. Values include
2469
no-eol, line-delta, fulltext.
2470
BYTE_OFFSET is the ascii representation of the byte offset in the data file
2471
that the compressed data starts at.
2472
LENGTH is the ascii representation of the length of the data file.
2473
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
2475
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
2476
revision id already in the knit that is a parent of REVISION_ID.
2477
The ' :' marker is the end of record marker.
2480
when a write is interrupted to the index file, it will result in a line
2481
that does not end in ' :'. If the ' :' is not present at the end of a line,
2482
or at the end of the file, then the record that is missing it will be
2483
ignored by the parser.
2485
When writing new records to the index file, the data is preceded by '\n'
2486
to ensure that records always start on new lines even if the last write was
2487
interrupted. As a result its normal for the last line in the index to be
2488
missing a trailing newline. One can be added with no harmful effects.
2490
:ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
2491
where prefix is e.g. the (fileid,) for .texts instances or () for
2492
constant-mapped things like .revisions, and the old state is
2493
tuple(history_vector, cache_dict). This is used to prevent having an
2494
ABI change with the C extension that reads .kndx files.
2497
HEADER = b"# bzr knit index 8\n"
2499
def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
2500
"""Create a _KndxIndex on transport using mapper."""
2501
self._transport = transport
2502
self._mapper = mapper
2503
self._get_scope = get_scope
2504
self._allow_writes = allow_writes
2505
self._is_locked = is_locked
2507
self.has_graph = True
2509
def add_records(self, records, random_id=False, missing_compression_parents=False):
2510
"""Add multiple records to the index.
2512
:param records: a list of tuples:
2513
(key, options, access_memo, parents).
2514
:param random_id: If True the ids being added were randomly generated
2515
and no check for existence will be performed.
2516
:param missing_compression_parents: If True the records being added are
2517
only compressed against texts already in the index (or inside
2518
records). If False the records all refer to unavailable texts (or
2519
texts inside records) as compression parents.
2521
if missing_compression_parents:
2522
# It might be nice to get the edge of the records. But keys isn't
2524
keys = sorted(record[0] for record in records)
2525
raise errors.RevisionNotPresent(keys, self)
2527
for record in records:
2530
path = self._mapper.map(key) + '.kndx'
2531
path_keys = paths.setdefault(path, (prefix, []))
2532
path_keys[1].append(record)
2533
for path in sorted(paths):
2534
prefix, path_keys = paths[path]
2535
self._load_prefixes([prefix])
2537
orig_history = self._kndx_cache[prefix][1][:]
2538
orig_cache = self._kndx_cache[prefix][0].copy()
2541
for key, options, (_, pos, size), parents in path_keys:
2542
if not all(isinstance(option, bytes) for option in options):
2543
raise TypeError(options)
2545
# kndx indices cannot be parentless.
2549
+ key[-1], b','.join(options), b'%d' % pos, b'%d' % size,
2550
self._dictionary_compress(parents), b':'])
2551
if not isinstance(line, bytes):
2552
raise AssertionError(
2553
'data must be utf8 was %s' % type(line))
2555
self._cache_key(key, options, pos, size, parents)
2556
if len(orig_history):
2557
self._transport.append_bytes(path, b''.join(lines))
2559
self._init_index(path, lines)
2561
# If any problems happen, restore the original values and re-raise
2562
self._kndx_cache[prefix] = (orig_cache, orig_history)
2565
def scan_unvalidated_index(self, graph_index):
2566
"""See _KnitGraphIndex.scan_unvalidated_index."""
2567
# Because kndx files do not support atomic insertion via separate index
2568
# files, they do not support this method.
2569
raise NotImplementedError(self.scan_unvalidated_index)
2571
def get_missing_compression_parents(self):
2572
"""See _KnitGraphIndex.get_missing_compression_parents."""
2573
# Because kndx files do not support atomic insertion via separate index
2574
# files, they do not support this method.
2575
raise NotImplementedError(self.get_missing_compression_parents)
2577
def _cache_key(self, key, options, pos, size, parent_keys):
2578
"""Cache a version record in the history array and index cache.
2580
This is inlined into _load_data for performance. KEEP IN SYNC.
2581
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
2585
version_id = key[-1]
2586
# last-element only for compatibilty with the C load_data.
2587
parents = tuple(parent[-1] for parent in parent_keys)
2588
for parent in parent_keys:
2589
if parent[:-1] != prefix:
2590
raise ValueError("mismatched prefixes for %r, %r" % (
2592
cache, history = self._kndx_cache[prefix]
2593
# only want the _history index to reference the 1st index entry
2595
if version_id not in cache:
2596
index = len(history)
2597
history.append(version_id)
2599
index = cache[version_id][5]
2600
cache[version_id] = (version_id,
2607
def check_header(self, fp):
2608
line = fp.readline()
2610
# An empty file can actually be treated as though the file doesn't
2612
raise errors.NoSuchFile(self)
2613
if line != self.HEADER:
2614
raise KnitHeaderError(badline=line, filename=self)
2616
def _check_read(self):
2617
if not self._is_locked():
2618
raise errors.ObjectNotLocked(self)
2619
if self._get_scope() != self._scope:
2622
def _check_write_ok(self):
2623
"""Assert if not writes are permitted."""
2624
if not self._is_locked():
2625
raise errors.ObjectNotLocked(self)
2626
if self._get_scope() != self._scope:
2628
if self._mode != 'w':
2629
raise errors.ReadOnlyObjectDirtiedError(self)
2631
def get_build_details(self, keys):
2632
"""Get the method, index_memo and compression parent for keys.
2634
Ghosts are omitted from the result.
2636
:param keys: An iterable of keys.
2637
:return: A dict of key:(index_memo, compression_parent, parents,
2640
opaque structure to pass to read_records to extract the raw
2643
Content that this record is built upon, may be None
2645
Logical parents of this node
2647
extra information about the content which needs to be passed to
2648
Factory.parse_record
2650
parent_map = self.get_parent_map(keys)
2653
if key not in parent_map:
2655
method = self.get_method(key)
2656
if not isinstance(method, str):
2657
raise TypeError(method)
2658
parents = parent_map[key]
2659
if method == 'fulltext':
2660
compression_parent = None
2662
compression_parent = parents[0]
2663
noeol = b'no-eol' in self.get_options(key)
2664
index_memo = self.get_position(key)
2665
result[key] = (index_memo, compression_parent,
2666
parents, (method, noeol))
2669
def get_method(self, key):
2670
"""Return compression method of specified key."""
2671
options = self.get_options(key)
2672
if b'fulltext' in options:
2674
elif b'line-delta' in options:
2677
raise KnitIndexUnknownMethod(self, options)
2679
def get_options(self, key):
2680
"""Return a list representing options.
2684
prefix, suffix = self._split_key(key)
2685
self._load_prefixes([prefix])
2687
return self._kndx_cache[prefix][0][suffix][1]
2689
raise RevisionNotPresent(key, self)
2691
def find_ancestry(self, keys):
2692
"""See CombinedGraphIndex.find_ancestry()"""
2693
prefixes = set(key[:-1] for key in keys)
2694
self._load_prefixes(prefixes)
2697
missing_keys = set()
2698
pending_keys = list(keys)
2699
# This assumes that keys will not reference parents in a different
2700
# prefix, which is accurate so far.
2702
key = pending_keys.pop()
2703
if key in parent_map:
2707
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2709
missing_keys.add(key)
2711
parent_keys = tuple([prefix + (suffix,)
2712
for suffix in suffix_parents])
2713
parent_map[key] = parent_keys
2714
pending_keys.extend([p for p in parent_keys
2715
if p not in parent_map])
2716
return parent_map, missing_keys
2718
def get_parent_map(self, keys):
2719
"""Get a map of the parents of keys.
2721
:param keys: The keys to look up parents for.
2722
:return: A mapping from keys to parents. Absent keys are absent from
2725
# Parse what we need to up front, this potentially trades off I/O
2726
# locality (.kndx and .knit in the same block group for the same file
2727
# id) for less checking in inner loops.
2728
prefixes = set(key[:-1] for key in keys)
2729
self._load_prefixes(prefixes)
2734
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2738
result[key] = tuple(prefix + (suffix,) for
2739
suffix in suffix_parents)
2742
def get_position(self, key):
2743
"""Return details needed to access the version.
2745
:return: a tuple (key, data position, size) to hand to the access
2746
logic to get the record.
2748
prefix, suffix = self._split_key(key)
2749
self._load_prefixes([prefix])
2750
entry = self._kndx_cache[prefix][0][suffix]
2751
return key, entry[2], entry[3]
2753
__contains__ = _mod_index._has_key_from_parent_map
2755
def _init_index(self, path, extra_lines=[]):
2756
"""Initialize an index."""
2758
sio.write(self.HEADER)
2759
sio.writelines(extra_lines)
2761
self._transport.put_file_non_atomic(path, sio,
2762
create_parent_dir=True)
2763
# self._create_parent_dir)
2764
# mode=self._file_mode,
2765
# dir_mode=self._dir_mode)
2768
"""Get all the keys in the collection.
2770
The keys are not ordered.
2773
# Identify all key prefixes.
2774
# XXX: A bit hacky, needs polish.
2775
if isinstance(self._mapper, ConstantMapper):
2779
for quoted_relpath in self._transport.iter_files_recursive():
2780
path, ext = os.path.splitext(quoted_relpath)
2782
prefixes = [self._mapper.unmap(path) for path in relpaths]
2783
self._load_prefixes(prefixes)
2784
for prefix in prefixes:
2785
for suffix in self._kndx_cache[prefix][1]:
2786
result.add(prefix + (suffix,))
2789
def _load_prefixes(self, prefixes):
2790
"""Load the indices for prefixes."""
2792
for prefix in prefixes:
2793
if prefix not in self._kndx_cache:
2794
# the load_data interface writes to these variables.
2797
self._filename = prefix
2799
path = self._mapper.map(prefix) + '.kndx'
2800
with self._transport.get(path) as fp:
2801
# _load_data may raise NoSuchFile if the target knit is
2803
_load_data(self, fp)
2804
self._kndx_cache[prefix] = (self._cache, self._history)
2809
self._kndx_cache[prefix] = ({}, [])
2810
if isinstance(self._mapper, ConstantMapper):
2811
# preserve behaviour for revisions.kndx etc.
2812
self._init_index(path)
2817
missing_keys = _mod_index._missing_keys_from_parent_map
2819
def _partition_keys(self, keys):
2820
"""Turn keys into a dict of prefix:suffix_list."""
2823
prefix_keys = result.setdefault(key[:-1], [])
2824
prefix_keys.append(key[-1])
2827
def _dictionary_compress(self, keys):
2828
"""Dictionary compress keys.
2830
:param keys: The keys to generate references to.
2831
:return: A string representation of keys. keys which are present are
2832
dictionary compressed, and others are emitted as fulltext with a
2838
prefix = keys[0][:-1]
2839
cache = self._kndx_cache[prefix][0]
2841
if key[:-1] != prefix:
2842
# kndx indices cannot refer across partitioned storage.
2843
raise ValueError("mismatched prefixes for %r" % keys)
2844
if key[-1] in cache:
2845
# -- inlined lookup() --
2846
result_list.append(b'%d' % cache[key[-1]][5])
2847
# -- end lookup () --
2849
result_list.append(b'.' + key[-1])
2850
return b' '.join(result_list)
2852
def _reset_cache(self):
2853
# Possibly this should be a LRU cache. A dictionary from key_prefix to
2854
# (cache_dict, history_vector) for parsed kndx files.
2855
self._kndx_cache = {}
2856
self._scope = self._get_scope()
2857
allow_writes = self._allow_writes()
2863
def _sort_keys_by_io(self, keys, positions):
2864
"""Figure out an optimal order to read the records for the given keys.
2866
Sort keys, grouped by index and sorted by position.
2868
:param keys: A list of keys whose records we want to read. This will be
2870
:param positions: A dict, such as the one returned by
2871
_get_components_positions()
2874
def get_sort_key(key):
2875
index_memo = positions[key][1]
2876
# Group by prefix and position. index_memo[0] is the key, so it is
2877
# (file_id, revision_id) and we don't want to sort on revision_id,
2878
# index_memo[1] is the position, and index_memo[2] is the size,
2879
# which doesn't matter for the sort
2880
return index_memo[0][:-1], index_memo[1]
2881
return keys.sort(key=get_sort_key)
2883
_get_total_build_size = _get_total_build_size
2885
def _split_key(self, key):
2886
"""Split key into a prefix and suffix."""
2887
# GZ 2018-07-03: This is intentionally either a sequence or bytes?
2888
if isinstance(key, bytes):
2889
return key[:-1], key[-1:]
2890
return key[:-1], key[-1]
2893
class _KnitGraphIndex(object):
2894
"""A KnitVersionedFiles index layered on GraphIndex."""
2896
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2897
add_callback=None, track_external_parent_refs=False):
2898
"""Construct a KnitGraphIndex on a graph_index.
2900
:param graph_index: An implementation of breezy.index.GraphIndex.
2901
:param is_locked: A callback to check whether the object should answer
2903
:param deltas: Allow delta-compressed records.
2904
:param parents: If True, record knits parents, if not do not record
2906
:param add_callback: If not None, allow additions to the index and call
2907
this callback with a list of added GraphIndex nodes:
2908
[(node, value, node_refs), ...]
2909
:param is_locked: A callback, returns True if the index is locked and
2911
:param track_external_parent_refs: If True, record all external parent
2912
references parents from added records. These can be retrieved
2913
later by calling get_missing_parents().
2915
self._add_callback = add_callback
2916
self._graph_index = graph_index
2917
self._deltas = deltas
2918
self._parents = parents
2919
if deltas and not parents:
2920
# XXX: TODO: Delta tree and parent graph should be conceptually
2922
raise KnitCorrupt(self, "Cannot do delta compression without "
2924
self.has_graph = parents
2925
self._is_locked = is_locked
2926
self._missing_compression_parents = set()
2927
if track_external_parent_refs:
2928
self._key_dependencies = _KeyRefs()
2930
self._key_dependencies = None
2933
return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2935
def add_records(self, records, random_id=False,
2936
missing_compression_parents=False):
2937
"""Add multiple records to the index.
2939
This function does not insert data into the Immutable GraphIndex
2940
backing the KnitGraphIndex, instead it prepares data for insertion by
2941
the caller and checks that it is safe to insert then calls
2942
self._add_callback with the prepared GraphIndex nodes.
2944
:param records: a list of tuples:
2945
(key, options, access_memo, parents).
2946
:param random_id: If True the ids being added were randomly generated
2947
and no check for existence will be performed.
2948
:param missing_compression_parents: If True the records being added are
2949
only compressed against texts already in the index (or inside
2950
records). If False the records all refer to unavailable texts (or
2951
texts inside records) as compression parents.
2953
if not self._add_callback:
2954
raise errors.ReadOnlyError(self)
2955
# we hope there are no repositories with inconsistent parentage
2959
compression_parents = set()
2960
key_dependencies = self._key_dependencies
2961
for (key, options, access_memo, parents) in records:
2963
parents = tuple(parents)
2964
if key_dependencies is not None:
2965
key_dependencies.add_references(key, parents)
2966
index, pos, size = access_memo
2967
if b'no-eol' in options:
2971
value += b"%d %d" % (pos, size)
2972
if not self._deltas:
2973
if b'line-delta' in options:
2975
self, "attempt to add line-delta in non-delta knit")
2978
if b'line-delta' in options:
2979
node_refs = (parents, (parents[0],))
2980
if missing_compression_parents:
2981
compression_parents.add(parents[0])
2983
node_refs = (parents, ())
2985
node_refs = (parents, )
2988
raise KnitCorrupt(self, "attempt to add node with parents "
2989
"in parentless index.")
2991
keys[key] = (value, node_refs)
2994
present_nodes = self._get_entries(keys)
2995
for (index, key, value, node_refs) in present_nodes:
2996
parents = node_refs[:1]
2997
# Sometimes these are passed as a list rather than a tuple
2998
passed = static_tuple.as_tuples(keys[key])
2999
passed_parents = passed[1][:1]
3000
if (value[0:1] != keys[key][0][0:1]
3001
or parents != passed_parents):
3002
node_refs = static_tuple.as_tuples(node_refs)
3003
raise KnitCorrupt(self, "inconsistent details in add_records"
3004
": %s %s" % ((value, node_refs), passed))
3008
for key, (value, node_refs) in keys.items():
3009
result.append((key, value, node_refs))
3011
for key, (value, node_refs) in keys.items():
3012
result.append((key, value))
3013
self._add_callback(result)
3014
if missing_compression_parents:
3015
# This may appear to be incorrect (it does not check for
3016
# compression parents that are in the existing graph index),
3017
# but such records won't have been buffered, so this is
3018
# actually correct: every entry when
3019
# missing_compression_parents==True either has a missing parent, or
3020
# a parent that is one of the keys in records.
3021
compression_parents.difference_update(keys)
3022
self._missing_compression_parents.update(compression_parents)
3023
# Adding records may have satisfied missing compression parents.
3024
self._missing_compression_parents.difference_update(keys)
3026
def scan_unvalidated_index(self, graph_index):
3027
"""Inform this _KnitGraphIndex that there is an unvalidated index.
3029
This allows this _KnitGraphIndex to keep track of any missing
3030
compression parents we may want to have filled in to make those
3033
:param graph_index: A GraphIndex
3036
new_missing = graph_index.external_references(ref_list_num=1)
3037
new_missing.difference_update(self.get_parent_map(new_missing))
3038
self._missing_compression_parents.update(new_missing)
3039
if self._key_dependencies is not None:
3040
# Add parent refs from graph_index (and discard parent refs that
3041
# the graph_index has).
3042
for node in graph_index.iter_all_entries():
3043
self._key_dependencies.add_references(node[1], node[3][0])
3045
def get_missing_compression_parents(self):
3046
"""Return the keys of missing compression parents.
3048
Missing compression parents occur when a record stream was missing
3049
basis texts, or a index was scanned that had missing basis texts.
3051
return frozenset(self._missing_compression_parents)
3053
def get_missing_parents(self):
3054
"""Return the keys of missing parents."""
3055
# If updating this, you should also update
3056
# groupcompress._GCGraphIndex.get_missing_parents
3057
# We may have false positives, so filter those out.
3058
self._key_dependencies.satisfy_refs_for_keys(
3059
self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
3060
return frozenset(self._key_dependencies.get_unsatisfied_refs())
3062
def _check_read(self):
3063
"""raise if reads are not permitted."""
3064
if not self._is_locked():
3065
raise errors.ObjectNotLocked(self)
3067
def _check_write_ok(self):
3068
"""Assert if writes are not permitted."""
3069
if not self._is_locked():
3070
raise errors.ObjectNotLocked(self)
3072
def _compression_parent(self, an_entry):
3073
# return the key that an_entry is compressed against, or None
3074
# Grab the second parent list (as deltas implies parents currently)
3075
compression_parents = an_entry[3][1]
3076
if not compression_parents:
3078
if len(compression_parents) != 1:
3079
raise AssertionError(
3080
"Too many compression parents: %r" % compression_parents)
3081
return compression_parents[0]
3083
def get_build_details(self, keys):
3084
"""Get the method, index_memo and compression parent for version_ids.
3086
Ghosts are omitted from the result.
3088
:param keys: An iterable of keys.
3089
:return: A dict of key:
3090
(index_memo, compression_parent, parents, record_details).
3092
opaque structure to pass to read_records to extract the raw
3095
Content that this record is built upon, may be None
3097
Logical parents of this node
3099
extra information about the content which needs to be passed to
3100
Factory.parse_record
3104
entries = self._get_entries(keys, False)
3105
for entry in entries:
3107
if not self._parents:
3110
parents = entry[3][0]
3111
if not self._deltas:
3112
compression_parent_key = None
3114
compression_parent_key = self._compression_parent(entry)
3115
noeol = (entry[2][0:1] == b'N')
3116
if compression_parent_key:
3117
method = 'line-delta'
3120
result[key] = (self._node_to_position(entry),
3121
compression_parent_key, parents,
3125
def _get_entries(self, keys, check_present=False):
3126
"""Get the entries for keys.
3128
:param keys: An iterable of index key tuples.
3133
for node in self._graph_index.iter_entries(keys):
3135
found_keys.add(node[1])
3137
# adapt parentless index to the rest of the code.
3138
for node in self._graph_index.iter_entries(keys):
3139
yield node[0], node[1], node[2], ()
3140
found_keys.add(node[1])
3142
missing_keys = keys.difference(found_keys)
3144
raise RevisionNotPresent(missing_keys.pop(), self)
3146
def get_method(self, key):
3147
"""Return compression method of specified key."""
3148
return self._get_method(self._get_node(key))
3150
def _get_method(self, node):
3151
if not self._deltas:
3153
if self._compression_parent(node):
3158
def _get_node(self, key):
3160
return list(self._get_entries([key]))[0]
3162
raise RevisionNotPresent(key, self)
3164
def get_options(self, key):
3165
"""Return a list representing options.
3169
node = self._get_node(key)
3170
options = [self._get_method(node).encode('ascii')]
3171
if node[2][0:1] == b'N':
3172
options.append(b'no-eol')
3175
def find_ancestry(self, keys):
3176
"""See CombinedGraphIndex.find_ancestry()"""
3177
return self._graph_index.find_ancestry(keys, 0)
3179
def get_parent_map(self, keys):
3180
"""Get a map of the parents of keys.
3182
:param keys: The keys to look up parents for.
3183
:return: A mapping from keys to parents. Absent keys are absent from
3187
nodes = self._get_entries(keys)
3191
result[node[1]] = node[3][0]
3194
result[node[1]] = None
3197
def get_position(self, key):
3198
"""Return details needed to access the version.
3200
:return: a tuple (index, data position, size) to hand to the access
3201
logic to get the record.
3203
node = self._get_node(key)
3204
return self._node_to_position(node)
3206
__contains__ = _mod_index._has_key_from_parent_map
3209
"""Get all the keys in the collection.
3211
The keys are not ordered.
3214
return [node[1] for node in self._graph_index.iter_all_entries()]
3216
missing_keys = _mod_index._missing_keys_from_parent_map
3218
def _node_to_position(self, node):
3219
"""Convert an index value to position details."""
3220
bits = node[2][1:].split(b' ')
3221
return node[0], int(bits[0]), int(bits[1])
3223
def _sort_keys_by_io(self, keys, positions):
3224
"""Figure out an optimal order to read the records for the given keys.
3226
Sort keys, grouped by index and sorted by position.
3228
:param keys: A list of keys whose records we want to read. This will be
3230
:param positions: A dict, such as the one returned by
3231
_get_components_positions()
3234
def get_index_memo(key):
3235
# index_memo is at offset [1]. It is made up of (GraphIndex,
3236
# position, size). GI is an object, which will be unique for each
3237
# pack file. This causes us to group by pack file, then sort by
3238
# position. Size doesn't matter, but it isn't worth breaking up the
3240
return positions[key][1]
3241
return keys.sort(key=get_index_memo)
3243
_get_total_build_size = _get_total_build_size
3246
class _KnitKeyAccess(object):
3247
"""Access to records in .knit files."""
3249
def __init__(self, transport, mapper):
3250
"""Create a _KnitKeyAccess with transport and mapper.
3252
:param transport: The transport the access object is rooted at.
3253
:param mapper: The mapper used to map keys to .knit files.
3255
self._transport = transport
3256
self._mapper = mapper
3258
def add_raw_record(self, key, size, raw_data):
3259
"""Add raw knit bytes to a storage area.
3261
The data is spooled to the container writer in one bytes-record per
3264
:param key: The key of the raw data segment
3265
:param size: The size of the raw data segment
3266
:param raw_data: A chunked bytestring containing the data.
3267
:return: opaque index memo to retrieve the record later.
3268
For _KnitKeyAccess the memo is (key, pos, length), where the key is
3271
path = self._mapper.map(key)
3273
base = self._transport.append_bytes(path + '.knit', b''.join(raw_data))
3274
except errors.NoSuchFile:
3275
self._transport.mkdir(osutils.dirname(path))
3276
base = self._transport.append_bytes(path + '.knit', b''.join(raw_data))
3279
return (key, base, size)
3281
def add_raw_records(self, key_sizes, raw_data):
3282
"""Add raw knit bytes to a storage area.
3284
The data is spooled to the container writer in one bytes-record per
3287
:param sizes: An iterable of tuples containing the key and size of each
3289
:param raw_data: A chunked bytestring containing the data.
3290
:return: A list of memos to retrieve the record later. Each memo is an
3291
opaque index memo. For _KnitKeyAccess the memo is (key, pos,
3292
length), where the key is the record key.
3294
raw_data = b''.join(raw_data)
3295
if not isinstance(raw_data, bytes):
3296
raise AssertionError(
3297
'data must be plain bytes was %s' % type(raw_data))
3300
# TODO: This can be tuned for writing to sftp and other servers where
3301
# append() is relatively expensive by grouping the writes to each key
3303
for key, size in key_sizes:
3304
record_bytes = [raw_data[offset:offset + size]]
3305
result.append(self.add_raw_record(key, size, record_bytes))
3310
"""Flush pending writes on this access object.
3312
For .knit files this is a no-op.
3316
def get_raw_records(self, memos_for_retrieval):
3317
"""Get the raw bytes for a records.
3319
:param memos_for_retrieval: An iterable containing the access memo for
3320
retrieving the bytes.
3321
:return: An iterator over the bytes of the records.
3323
# first pass, group into same-index request to minimise readv's issued.
3325
current_prefix = None
3326
for (key, offset, length) in memos_for_retrieval:
3327
if current_prefix == key[:-1]:
3328
current_list.append((offset, length))
3330
if current_prefix is not None:
3331
request_lists.append((current_prefix, current_list))
3332
current_prefix = key[:-1]
3333
current_list = [(offset, length)]
3334
# handle the last entry
3335
if current_prefix is not None:
3336
request_lists.append((current_prefix, current_list))
3337
for prefix, read_vector in request_lists:
3338
path = self._mapper.map(prefix) + '.knit'
3339
for pos, data in self._transport.readv(path, read_vector):
3343
def annotate_knit(knit, revision_id):
3344
"""Annotate a knit with no cached annotations.
3346
This implementation is for knits with no cached annotations.
3347
It will work for knits with cached annotations, but this is not
3350
annotator = _KnitAnnotator(knit)
3351
return iter(annotator.annotate_flat(revision_id))
3354
class _KnitAnnotator(annotate.Annotator):
3355
"""Build up the annotations for a text."""
3357
def __init__(self, vf):
3358
annotate.Annotator.__init__(self, vf)
3360
# TODO: handle Nodes which cannot be extracted
3361
# self._ghosts = set()
3363
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3364
self._matching_blocks = {}
3366
# KnitContent objects
3367
self._content_objects = {}
3368
# The number of children that depend on this fulltext content object
3369
self._num_compression_children = {}
3370
# Delta records that need their compression parent before they can be
3372
self._pending_deltas = {}
3373
# Fulltext records that are waiting for their parents fulltexts before
3374
# they can be yielded for annotation
3375
self._pending_annotation = {}
3377
self._all_build_details = {}
3379
def _get_build_graph(self, key):
3380
"""Get the graphs for building texts and annotations.
3382
The data you need for creating a full text may be different than the
3383
data you need to annotate that text. (At a minimum, you need both
3384
parents to create an annotation, but only need 1 parent to generate the
3387
:return: A list of (key, index_memo) records, suitable for
3388
passing to read_records_iter to start reading in the raw data from
3394
self._num_needed_children[key] = 1
3396
# get all pending nodes
3397
this_iteration = pending
3398
build_details = self._vf._index.get_build_details(this_iteration)
3399
self._all_build_details.update(build_details)
3400
# new_nodes = self._vf._index._get_entries(this_iteration)
3402
for key, details in build_details.items():
3403
(index_memo, compression_parent, parent_keys,
3404
record_details) = details
3405
self._parent_map[key] = parent_keys
3406
self._heads_provider = None
3407
records.append((key, index_memo))
3408
# Do we actually need to check _annotated_lines?
3409
pending.update([p for p in parent_keys
3410
if p not in self._all_build_details])
3412
for parent_key in parent_keys:
3413
if parent_key in self._num_needed_children:
3414
self._num_needed_children[parent_key] += 1
3416
self._num_needed_children[parent_key] = 1
3417
if compression_parent:
3418
if compression_parent in self._num_compression_children:
3419
self._num_compression_children[compression_parent] += 1
3421
self._num_compression_children[compression_parent] = 1
3423
missing_versions = this_iteration.difference(build_details)
3424
if missing_versions:
3425
for key in missing_versions:
3426
if key in self._parent_map and key in self._text_cache:
3427
# We already have this text ready, we just need to
3428
# yield it later so we get it annotated
3430
parent_keys = self._parent_map[key]
3431
for parent_key in parent_keys:
3432
if parent_key in self._num_needed_children:
3433
self._num_needed_children[parent_key] += 1
3435
self._num_needed_children[parent_key] = 1
3436
pending.update([p for p in parent_keys
3437
if p not in self._all_build_details])
3439
raise errors.RevisionNotPresent(key, self._vf)
3440
# Generally we will want to read the records in reverse order, because
3441
# we find the parent nodes after the children
3443
return records, ann_keys
3445
def _get_needed_texts(self, key, pb=None):
3446
# if True or len(self._vf._immediate_fallback_vfs) > 0:
3447
if len(self._vf._immediate_fallback_vfs) > 0:
3448
# If we have fallbacks, go to the generic path
3449
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3454
records, ann_keys = self._get_build_graph(key)
3455
for idx, (sub_key, text, num_lines) in enumerate(
3456
self._extract_texts(records)):
3458
pb.update(gettext('annotating'), idx, len(records))
3459
yield sub_key, text, num_lines
3460
for sub_key in ann_keys:
3461
text = self._text_cache[sub_key]
3462
num_lines = len(text) # bad assumption
3463
yield sub_key, text, num_lines
3465
except errors.RetryWithNewPacks as e:
3466
self._vf._access.reload_or_raise(e)
3467
# The cached build_details are no longer valid
3468
self._all_build_details.clear()
3470
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3471
parent_lines = self._text_cache[compression_parent]
3472
blocks = list(KnitContent.get_line_delta_blocks(
3473
delta, parent_lines, lines))
3474
self._matching_blocks[(key, compression_parent)] = blocks
3476
def _expand_record(self, key, parent_keys, compression_parent, record,
3479
if compression_parent:
3480
if compression_parent not in self._content_objects:
3481
# Waiting for the parent
3482
self._pending_deltas.setdefault(compression_parent, []).append(
3483
(key, parent_keys, record, record_details))
3485
# We have the basis parent, so expand the delta
3486
num = self._num_compression_children[compression_parent]
3489
base_content = self._content_objects.pop(compression_parent)
3490
self._num_compression_children.pop(compression_parent)
3492
self._num_compression_children[compression_parent] = num
3493
base_content = self._content_objects[compression_parent]
3494
# It is tempting to want to copy_base_content=False for the last
3495
# child object. However, whenever noeol=False,
3496
# self._text_cache[parent_key] is content._lines. So mutating it
3497
# gives very bad results.
3498
# The alternative is to copy the lines into text cache, but then we
3499
# are copying anyway, so just do it here.
3500
content, delta = self._vf._factory.parse_record(
3501
key, record, record_details, base_content,
3502
copy_base_content=True)
3505
content, _ = self._vf._factory.parse_record(
3506
key, record, record_details, None)
3507
if self._num_compression_children.get(key, 0) > 0:
3508
self._content_objects[key] = content
3509
lines = content.text()
3510
self._text_cache[key] = lines
3511
if delta is not None:
3512
self._cache_delta_blocks(key, compression_parent, delta, lines)
3515
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3516
"""Get the list of annotations for the parent, and the matching lines.
3518
:param text: The opaque value given by _get_needed_texts
3519
:param parent_key: The key for the parent text
3520
:return: (parent_annotations, matching_blocks)
3521
parent_annotations is a list as long as the number of lines in
3523
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3524
indicating which lines match between the two texts
3526
block_key = (key, parent_key)
3527
if block_key in self._matching_blocks:
3528
blocks = self._matching_blocks.pop(block_key)
3529
parent_annotations = self._annotations_cache[parent_key]
3530
return parent_annotations, blocks
3531
return annotate.Annotator._get_parent_annotations_and_matches(self,
3532
key, text, parent_key)
3534
def _process_pending(self, key):
3535
"""The content for 'key' was just processed.
3537
Determine if there is any more pending work to be processed.
3540
if key in self._pending_deltas:
3541
compression_parent = key
3542
children = self._pending_deltas.pop(key)
3543
for child_key, parent_keys, record, record_details in children:
3544
lines = self._expand_record(child_key, parent_keys,
3546
record, record_details)
3547
if self._check_ready_for_annotations(child_key, parent_keys):
3548
to_return.append(child_key)
3549
# Also check any children that are waiting for this parent to be
3551
if key in self._pending_annotation:
3552
children = self._pending_annotation.pop(key)
3553
to_return.extend([c for c, p_keys in children
3554
if self._check_ready_for_annotations(c, p_keys)])
3557
def _check_ready_for_annotations(self, key, parent_keys):
3558
"""return true if this text is ready to be yielded.
3560
Otherwise, this will return False, and queue the text into
3561
self._pending_annotation
3563
for parent_key in parent_keys:
3564
if parent_key not in self._annotations_cache:
3565
# still waiting on at least one parent text, so queue it up
3566
# Note that if there are multiple parents, we need to wait
3568
self._pending_annotation.setdefault(parent_key,
3569
[]).append((key, parent_keys))
3573
def _extract_texts(self, records):
3574
"""Extract the various texts needed based on records"""
3575
# We iterate in the order read, rather than a strict order requested
3576
# However, process what we can, and put off to the side things that
3577
# still need parents, cleaning them up when those parents are
3580
# 1) As 'records' are read, see if we can expand these records into
3581
# Content objects (and thus lines)
3582
# 2) If a given line-delta is waiting on its compression parent, it
3583
# gets queued up into self._pending_deltas, otherwise we expand
3584
# it, and put it into self._text_cache and self._content_objects
3585
# 3) If we expanded the text, we will then check to see if all
3586
# parents have also been processed. If so, this text gets yielded,
3587
# else this record gets set aside into pending_annotation
3588
# 4) Further, if we expanded the text in (2), we will then check to
3589
# see if there are any children in self._pending_deltas waiting to
3590
# also be processed. If so, we go back to (2) for those
3591
# 5) Further again, if we yielded the text, we can then check if that
3592
# 'unlocks' any of the texts in pending_annotations, which should
3593
# then get yielded as well
3594
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3595
# compression child could unlock yet another, and yielding a fulltext
3596
# will also 'unlock' the children that are waiting on that annotation.
3597
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3598
# if other parents are also waiting.)
3599
# We want to yield content before expanding child content objects, so
3600
# that we know when we can re-use the content lines, and the annotation
3601
# code can know when it can stop caching fulltexts, as well.
3603
# Children that are missing their compression parent
3605
for (key, record, digest) in self._vf._read_records_iter(records):
3607
details = self._all_build_details[key]
3608
(_, compression_parent, parent_keys, record_details) = details
3609
lines = self._expand_record(key, parent_keys, compression_parent,
3610
record, record_details)
3612
# Pending delta should be queued up
3614
# At this point, we may be able to yield this content, if all
3615
# parents are also finished
3616
yield_this_text = self._check_ready_for_annotations(key,
3619
# All parents present
3620
yield key, lines, len(lines)
3621
to_process = self._process_pending(key)
3623
this_process = to_process
3625
for key in this_process:
3626
lines = self._text_cache[key]
3627
yield key, lines, len(lines)
3628
to_process.extend(self._process_pending(key))
3632
from ._knit_load_data_pyx import _load_data_c as _load_data
3633
except ImportError as e:
3634
osutils.failed_to_load_extension(e)
3635
from ._knit_load_data_py import _load_data_py as _load_data