1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
54
from __future__ import absolute_import
59
from ..lazy_import import lazy_import
60
lazy_import(globals(), """
73
from breezy.bzr import (
78
from breezy.bzr import pack_repo
79
from breezy.i18n import gettext
86
from ..errors import (
92
from ..osutils import (
98
from ..sixish import (
104
from ..bzr.versionedfile import (
106
AbsentContentFactory,
111
VersionedFilesWithFallbacks,
115
# TODO: Split out code specific to this format into an associated object.
117
# TODO: Can we put in some kind of value to check that the index and data
118
# files belong together?
120
# TODO: accommodate binaries, perhaps by storing a byte count
122
# TODO: function to check whole file
124
# TODO: atomically append data, then measure backwards from the cursor
125
# position after writing to work out where it was located. we may need to
126
# bypass python file buffering.
128
DATA_SUFFIX = '.knit'
129
INDEX_SUFFIX = '.kndx'
130
_STREAM_MIN_BUFFER_SIZE = 5 * 1024 * 1024
133
class KnitError(InternalBzrError):
138
class KnitCorrupt(KnitError):
140
_fmt = "Knit %(filename)s corrupt: %(how)s"
142
def __init__(self, filename, how):
143
KnitError.__init__(self)
144
self.filename = filename
148
class SHA1KnitCorrupt(KnitCorrupt):
150
_fmt = ("Knit %(filename)s corrupt: sha-1 of reconstructed text does not "
151
"match expected sha-1. key %(key)s expected sha %(expected)s actual "
154
def __init__(self, filename, actual, expected, key, content):
155
KnitError.__init__(self)
156
self.filename = filename
158
self.expected = expected
160
self.content = content
163
class KnitDataStreamIncompatible(KnitError):
164
# Not raised anymore, as we can convert data streams. In future we may
165
# need it again for more exotic cases, so we're keeping it around for now.
167
_fmt = "Cannot insert knit data stream of format \"%(stream_format)s\" into knit of format \"%(target_format)s\"."
169
def __init__(self, stream_format, target_format):
170
self.stream_format = stream_format
171
self.target_format = target_format
174
class KnitDataStreamUnknown(KnitError):
175
# Indicates a data stream we don't know how to handle.
177
_fmt = "Cannot parse knit data stream of format \"%(stream_format)s\"."
179
def __init__(self, stream_format):
180
self.stream_format = stream_format
183
class KnitHeaderError(KnitError):
185
_fmt = 'Knit header error: %(badline)r unexpected for file "%(filename)s".'
187
def __init__(self, badline, filename):
188
KnitError.__init__(self)
189
self.badline = badline
190
self.filename = filename
193
class KnitIndexUnknownMethod(KnitError):
194
"""Raised when we don't understand the storage method.
196
Currently only 'fulltext' and 'line-delta' are supported.
199
_fmt = ("Knit index %(filename)s does not have a known method"
200
" in options: %(options)r")
202
def __init__(self, filename, options):
203
KnitError.__init__(self)
204
self.filename = filename
205
self.options = options
208
class KnitAdapter(object):
209
"""Base class for knit record adaption."""
211
def __init__(self, basis_vf):
212
"""Create an adapter which accesses full texts from basis_vf.
214
:param basis_vf: A versioned file to access basis texts of deltas from.
215
May be None for adapters that do not need to access basis texts.
217
self._data = KnitVersionedFiles(None, None)
218
self._annotate_factory = KnitAnnotateFactory()
219
self._plain_factory = KnitPlainFactory()
220
self._basis_vf = basis_vf
223
class FTAnnotatedToUnannotated(KnitAdapter):
224
"""An adapter from FT annotated knits to unannotated ones."""
226
def get_bytes(self, factory):
227
annotated_compressed_bytes = factory._raw_record
229
self._data._parse_record_unchecked(annotated_compressed_bytes)
230
content = self._annotate_factory.parse_fulltext(contents, rec[1])
231
size, bytes = self._data._record_to_data(
232
(rec[1],), rec[3], content.text())
236
class DeltaAnnotatedToUnannotated(KnitAdapter):
237
"""An adapter for deltas from annotated to unannotated."""
239
def get_bytes(self, factory):
240
annotated_compressed_bytes = factory._raw_record
242
self._data._parse_record_unchecked(annotated_compressed_bytes)
243
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
245
contents = self._plain_factory.lower_line_delta(delta)
246
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
250
class FTAnnotatedToFullText(KnitAdapter):
251
"""An adapter from FT annotated knits to unannotated ones."""
253
def get_bytes(self, factory):
254
annotated_compressed_bytes = factory._raw_record
256
self._data._parse_record_unchecked(annotated_compressed_bytes)
257
content, delta = self._annotate_factory.parse_record(factory.key[-1],
258
contents, factory._build_details, None)
259
return b''.join(content.text())
262
class DeltaAnnotatedToFullText(KnitAdapter):
263
"""An adapter for deltas from annotated to unannotated."""
265
def get_bytes(self, factory):
266
annotated_compressed_bytes = factory._raw_record
268
self._data._parse_record_unchecked(annotated_compressed_bytes)
269
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
271
compression_parent = factory.parents[0]
272
basis_entry = next(self._basis_vf.get_record_stream(
273
[compression_parent], 'unordered', True))
274
if basis_entry.storage_kind == 'absent':
275
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
276
basis_chunks = basis_entry.get_bytes_as('chunked')
277
basis_lines = osutils.chunks_to_lines(basis_chunks)
278
# Manually apply the delta because we have one annotated content and
280
basis_content = PlainKnitContent(basis_lines, compression_parent)
281
basis_content.apply_delta(delta, rec[1])
282
basis_content._should_strip_eol = factory._build_details[1]
283
return b''.join(basis_content.text())
286
class FTPlainToFullText(KnitAdapter):
287
"""An adapter from FT plain knits to unannotated ones."""
289
def get_bytes(self, factory):
290
compressed_bytes = factory._raw_record
292
self._data._parse_record_unchecked(compressed_bytes)
293
content, delta = self._plain_factory.parse_record(factory.key[-1],
294
contents, factory._build_details, None)
295
return b''.join(content.text())
298
class DeltaPlainToFullText(KnitAdapter):
299
"""An adapter for deltas from annotated to unannotated."""
301
def get_bytes(self, factory):
302
compressed_bytes = factory._raw_record
304
self._data._parse_record_unchecked(compressed_bytes)
305
delta = self._plain_factory.parse_line_delta(contents, rec[1])
306
compression_parent = factory.parents[0]
307
# XXX: string splitting overhead.
308
basis_entry = next(self._basis_vf.get_record_stream(
309
[compression_parent], 'unordered', True))
310
if basis_entry.storage_kind == 'absent':
311
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
312
basis_chunks = basis_entry.get_bytes_as('chunked')
313
basis_lines = osutils.chunks_to_lines(basis_chunks)
314
basis_content = PlainKnitContent(basis_lines, compression_parent)
315
# Manually apply the delta because we have one annotated content and
317
content, _ = self._plain_factory.parse_record(rec[1], contents,
318
factory._build_details, basis_content)
319
return b''.join(content.text())
322
class KnitContentFactory(ContentFactory):
323
"""Content factory for streaming from knits.
325
:seealso ContentFactory:
328
def __init__(self, key, parents, build_details, sha1, raw_record,
329
annotated, knit=None, network_bytes=None):
330
"""Create a KnitContentFactory for key.
333
:param parents: The parents.
334
:param build_details: The build details as returned from
336
:param sha1: The sha1 expected from the full text of this object.
337
:param raw_record: The bytes of the knit data from disk.
338
:param annotated: True if the raw data is annotated.
339
:param network_bytes: None to calculate the network bytes on demand,
340
not-none if they are already known.
342
ContentFactory.__init__(self)
345
self.parents = parents
346
if build_details[0] == 'line-delta':
351
annotated_kind = 'annotated-'
354
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
355
self._raw_record = raw_record
356
self._network_bytes = network_bytes
357
self._build_details = build_details
360
def _create_network_bytes(self):
361
"""Create a fully serialised network version for transmission."""
362
# storage_kind, key, parents, Noeol, raw_record
363
key_bytes = b'\x00'.join(self.key)
364
if self.parents is None:
365
parent_bytes = b'None:'
367
parent_bytes = b'\t'.join(b'\x00'.join(key)
368
for key in self.parents)
369
if self._build_details[1]:
373
network_bytes = b"%s\n%s\n%s\n%s%s" % (
374
self.storage_kind.encode('ascii'), key_bytes,
375
parent_bytes, noeol, self._raw_record)
376
self._network_bytes = network_bytes
378
def get_bytes_as(self, storage_kind):
379
if storage_kind == self.storage_kind:
380
if self._network_bytes is None:
381
self._create_network_bytes()
382
return self._network_bytes
383
if ('-ft-' in self.storage_kind
384
and storage_kind in ('chunked', 'fulltext')):
385
adapter_key = (self.storage_kind, 'fulltext')
386
adapter_factory = adapter_registry.get(adapter_key)
387
adapter = adapter_factory(None)
388
bytes = adapter.get_bytes(self)
389
if storage_kind == 'chunked':
393
if self._knit is not None:
394
# Not redundant with direct conversion above - that only handles
396
if storage_kind == 'chunked':
397
return self._knit.get_lines(self.key[0])
398
elif storage_kind == 'fulltext':
399
return self._knit.get_text(self.key[0])
400
raise errors.UnavailableRepresentation(self.key, storage_kind,
404
class LazyKnitContentFactory(ContentFactory):
405
"""A ContentFactory which can either generate full text or a wire form.
407
:seealso ContentFactory:
410
def __init__(self, key, parents, generator, first):
411
"""Create a LazyKnitContentFactory.
413
:param key: The key of the record.
414
:param parents: The parents of the record.
415
:param generator: A _ContentMapGenerator containing the record for this
417
:param first: Is this the first content object returned from generator?
418
if it is, its storage kind is knit-delta-closure, otherwise it is
419
knit-delta-closure-ref
422
self.parents = parents
424
self._generator = generator
425
self.storage_kind = "knit-delta-closure"
427
self.storage_kind = self.storage_kind + "-ref"
430
def get_bytes_as(self, storage_kind):
431
if storage_kind == self.storage_kind:
433
return self._generator._wire_bytes()
435
# all the keys etc are contained in the bytes returned in the
438
if storage_kind in ('chunked', 'fulltext'):
439
chunks = self._generator._get_one_work(self.key).text()
440
if storage_kind == 'chunked':
443
return b''.join(chunks)
444
raise errors.UnavailableRepresentation(self.key, storage_kind,
448
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
449
"""Convert a network record to a iterator over stream records.
451
:param storage_kind: The storage kind of the record.
452
Must be 'knit-delta-closure'.
453
:param bytes: The bytes of the record on the network.
455
generator = _NetworkContentMapGenerator(bytes, line_end)
456
return generator.get_record_stream()
459
def knit_network_to_record(storage_kind, bytes, line_end):
460
"""Convert a network record to a record object.
462
:param storage_kind: The storage kind of the record.
463
:param bytes: The bytes of the record on the network.
466
line_end = bytes.find(b'\n', start)
467
key = tuple(bytes[start:line_end].split(b'\x00'))
469
line_end = bytes.find(b'\n', start)
470
parent_line = bytes[start:line_end]
471
if parent_line == b'None:':
475
[tuple(segment.split(b'\x00')) for segment in parent_line.split(b'\t')
478
noeol = bytes[start:start + 1] == b'N'
479
if 'ft' in storage_kind:
482
method = 'line-delta'
483
build_details = (method, noeol)
485
raw_record = bytes[start:]
486
annotated = 'annotated' in storage_kind
487
return [KnitContentFactory(key, parents, build_details, None, raw_record,
488
annotated, network_bytes=bytes)]
491
class KnitContent(object):
492
"""Content of a knit version to which deltas can be applied.
494
This is always stored in memory as a list of lines with \\n at the end,
495
plus a flag saying if the final ending is really there or not, because that
496
corresponds to the on-disk knit representation.
500
self._should_strip_eol = False
502
def apply_delta(self, delta, new_version_id):
503
"""Apply delta to this object to become new_version_id."""
504
raise NotImplementedError(self.apply_delta)
506
def line_delta_iter(self, new_lines):
507
"""Generate line-based delta from this content to new_lines."""
508
new_texts = new_lines.text()
509
old_texts = self.text()
510
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
511
for tag, i1, i2, j1, j2 in s.get_opcodes():
514
# ofrom, oto, length, data
515
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
517
def line_delta(self, new_lines):
518
return list(self.line_delta_iter(new_lines))
521
def get_line_delta_blocks(knit_delta, source, target):
522
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
523
target_len = len(target)
526
for s_begin, s_end, t_len, new_text in knit_delta:
527
true_n = s_begin - s_pos
530
# knit deltas do not provide reliable info about whether the
531
# last line of a file matches, due to eol handling.
532
if source[s_pos + n - 1] != target[t_pos + n - 1]:
535
yield s_pos, t_pos, n
536
t_pos += t_len + true_n
538
n = target_len - t_pos
540
if source[s_pos + n - 1] != target[t_pos + n - 1]:
543
yield s_pos, t_pos, n
544
yield s_pos + (target_len - t_pos), target_len, 0
547
class AnnotatedKnitContent(KnitContent):
548
"""Annotated content."""
550
def __init__(self, lines):
551
KnitContent.__init__(self)
552
self._lines = list(lines)
555
"""Return a list of (origin, text) for each content line."""
556
lines = self._lines[:]
557
if self._should_strip_eol:
558
origin, last_line = lines[-1]
559
lines[-1] = (origin, last_line.rstrip(b'\n'))
562
def apply_delta(self, delta, new_version_id):
563
"""Apply delta to this object to become new_version_id."""
566
for start, end, count, delta_lines in delta:
567
lines[offset + start:offset + end] = delta_lines
568
offset = offset + (start - end) + count
572
lines = [text for origin, text in self._lines]
573
except ValueError as e:
574
# most commonly (only?) caused by the internal form of the knit
575
# missing annotation information because of a bug - see thread
577
raise KnitCorrupt(self,
578
"line in annotated knit missing annotation information: %s"
580
if self._should_strip_eol:
581
lines[-1] = lines[-1].rstrip(b'\n')
585
return AnnotatedKnitContent(self._lines)
588
class PlainKnitContent(KnitContent):
589
"""Unannotated content.
591
When annotate[_iter] is called on this content, the same version is reported
592
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
596
def __init__(self, lines, version_id):
597
KnitContent.__init__(self)
599
self._version_id = version_id
602
"""Return a list of (origin, text) for each content line."""
603
return [(self._version_id, line) for line in self._lines]
605
def apply_delta(self, delta, new_version_id):
606
"""Apply delta to this object to become new_version_id."""
609
for start, end, count, delta_lines in delta:
610
lines[offset + start:offset + end] = delta_lines
611
offset = offset + (start - end) + count
612
self._version_id = new_version_id
615
return PlainKnitContent(self._lines[:], self._version_id)
619
if self._should_strip_eol:
621
lines[-1] = lines[-1].rstrip(b'\n')
625
class _KnitFactory(object):
626
"""Base class for common Factory functions."""
628
def parse_record(self, version_id, record, record_details,
629
base_content, copy_base_content=True):
630
"""Parse a record into a full content object.
632
:param version_id: The official version id for this content
633
:param record: The data returned by read_records_iter()
634
:param record_details: Details about the record returned by
636
:param base_content: If get_build_details returns a compression_parent,
637
you must return a base_content here, else use None
638
:param copy_base_content: When building from the base_content, decide
639
you can either copy it and return a new object, or modify it in
641
:return: (content, delta) A Content object and possibly a line-delta,
644
method, noeol = record_details
645
if method == 'line-delta':
646
if copy_base_content:
647
content = base_content.copy()
649
content = base_content
650
delta = self.parse_line_delta(record, version_id)
651
content.apply_delta(delta, version_id)
653
content = self.parse_fulltext(record, version_id)
655
content._should_strip_eol = noeol
656
return (content, delta)
659
class KnitAnnotateFactory(_KnitFactory):
660
"""Factory for creating annotated Content objects."""
664
def make(self, lines, version_id):
665
num_lines = len(lines)
666
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
668
def parse_fulltext(self, content, version_id):
669
"""Convert fulltext to internal representation
671
fulltext content is of the format
672
revid(utf8) plaintext\n
673
internal representation is of the format:
676
# TODO: jam 20070209 The tests expect this to be returned as tuples,
677
# but the code itself doesn't really depend on that.
678
# Figure out a way to not require the overhead of turning the
679
# list back into tuples.
680
lines = (tuple(line.split(b' ', 1)) for line in content)
681
return AnnotatedKnitContent(lines)
683
def parse_line_delta_iter(self, lines):
684
return iter(self.parse_line_delta(lines))
686
def parse_line_delta(self, lines, version_id, plain=False):
687
"""Convert a line based delta into internal representation.
689
line delta is in the form of:
690
intstart intend intcount
692
revid(utf8) newline\n
693
internal representation is
694
(start, end, count, [1..count tuples (revid, newline)])
696
:param plain: If True, the lines are returned as a plain
697
list without annotations, not as a list of (origin, content) tuples, i.e.
698
(start, end, count, [1..count newline])
705
def cache_and_return(line):
706
origin, text = line.split(b' ', 1)
707
return cache.setdefault(origin, origin), text
709
# walk through the lines parsing.
710
# Note that the plain test is explicitly pulled out of the
711
# loop to minimise any performance impact
714
start, end, count = [int(n) for n in header.split(b',')]
715
contents = [next(lines).split(b' ', 1)[1]
716
for _ in range(count)]
717
result.append((start, end, count, contents))
720
start, end, count = [int(n) for n in header.split(b',')]
721
contents = [tuple(next(lines).split(b' ', 1))
722
for _ in range(count)]
723
result.append((start, end, count, contents))
726
def get_fulltext_content(self, lines):
727
"""Extract just the content lines from a fulltext."""
728
return (line.split(b' ', 1)[1] for line in lines)
730
def get_linedelta_content(self, lines):
731
"""Extract just the content from a line delta.
733
This doesn't return all of the extra information stored in a delta.
734
Only the actual content lines.
738
header = header.split(b',')
739
count = int(header[2])
740
for _ in range(count):
741
origin, text = next(lines).split(b' ', 1)
744
def lower_fulltext(self, content):
745
"""convert a fulltext content record into a serializable form.
747
see parse_fulltext which this inverts.
749
return [b'%s %s' % (o, t) for o, t in content._lines]
751
def lower_line_delta(self, delta):
752
"""convert a delta into a serializable form.
754
See parse_line_delta which this inverts.
756
# TODO: jam 20070209 We only do the caching thing to make sure that
757
# the origin is a valid utf-8 line, eventually we could remove it
759
for start, end, c, lines in delta:
760
out.append(b'%d,%d,%d\n' % (start, end, c))
761
out.extend(origin + b' ' + text
762
for origin, text in lines)
765
def annotate(self, knit, key):
766
content = knit._get_content(key)
767
# adjust for the fact that serialised annotations are only key suffixes
769
if isinstance(key, tuple):
771
origins = content.annotate()
773
for origin, line in origins:
774
result.append((prefix + (origin,), line))
777
# XXX: This smells a bit. Why would key ever be a non-tuple here?
778
# Aren't keys defined to be tuples? -- spiv 20080618
779
return content.annotate()
782
class KnitPlainFactory(_KnitFactory):
783
"""Factory for creating plain Content objects."""
787
def make(self, lines, version_id):
788
return PlainKnitContent(lines, version_id)
790
def parse_fulltext(self, content, version_id):
791
"""This parses an unannotated fulltext.
793
Note that this is not a noop - the internal representation
794
has (versionid, line) - its just a constant versionid.
796
return self.make(content, version_id)
798
def parse_line_delta_iter(self, lines, version_id):
800
num_lines = len(lines)
801
while cur < num_lines:
804
start, end, c = [int(n) for n in header.split(b',')]
805
yield start, end, c, lines[cur:cur + c]
808
def parse_line_delta(self, lines, version_id):
809
return list(self.parse_line_delta_iter(lines, version_id))
811
def get_fulltext_content(self, lines):
812
"""Extract just the content lines from a fulltext."""
815
def get_linedelta_content(self, lines):
816
"""Extract just the content from a line delta.
818
This doesn't return all of the extra information stored in a delta.
819
Only the actual content lines.
823
header = header.split(b',')
824
count = int(header[2])
825
for _ in range(count):
828
def lower_fulltext(self, content):
829
return content.text()
831
def lower_line_delta(self, delta):
833
for start, end, c, lines in delta:
834
out.append(b'%d,%d,%d\n' % (start, end, c))
838
def annotate(self, knit, key):
839
annotator = _KnitAnnotator(knit)
840
return annotator.annotate_flat(key)
843
def make_file_factory(annotated, mapper):
844
"""Create a factory for creating a file based KnitVersionedFiles.
846
This is only functional enough to run interface tests, it doesn't try to
847
provide a full pack environment.
849
:param annotated: knit annotations are wanted.
850
:param mapper: The mapper from keys to paths.
852
def factory(transport):
853
index = _KndxIndex(transport, mapper, lambda: None,
854
lambda: True, lambda: True)
855
access = _KnitKeyAccess(transport, mapper)
856
return KnitVersionedFiles(index, access, annotated=annotated)
860
def make_pack_factory(graph, delta, keylength):
861
"""Create a factory for creating a pack based VersionedFiles.
863
This is only functional enough to run interface tests, it doesn't try to
864
provide a full pack environment.
866
:param graph: Store a graph.
867
:param delta: Delta compress contents.
868
:param keylength: How long should keys be.
870
def factory(transport):
871
parents = graph or delta
877
max_delta_chain = 200
880
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
881
key_elements=keylength)
882
stream = transport.open_write_stream('newpack')
883
writer = pack.ContainerWriter(stream.write)
885
index = _KnitGraphIndex(graph_index, lambda: True, parents=parents,
886
deltas=delta, add_callback=graph_index.add_nodes)
887
access = pack_repo._DirectPackAccess({})
888
access.set_writer(writer, graph_index, (transport, 'newpack'))
889
result = KnitVersionedFiles(index, access,
890
max_delta_chain=max_delta_chain)
891
result.stream = stream
892
result.writer = writer
897
def cleanup_pack_knit(versioned_files):
898
versioned_files.stream.close()
899
versioned_files.writer.end()
902
def _get_total_build_size(self, keys, positions):
903
"""Determine the total bytes to build these keys.
905
(helper function because _KnitGraphIndex and _KndxIndex work the same, but
906
don't inherit from a common base.)
908
:param keys: Keys that we want to build
909
:param positions: dict of {key, (info, index_memo, comp_parent)} (such
910
as returned by _get_components_positions)
911
:return: Number of bytes to build those keys
913
all_build_index_memos = {}
917
for key in build_keys:
918
# This is mostly for the 'stacked' case
919
# Where we will be getting the data from a fallback
920
if key not in positions:
922
_, index_memo, compression_parent = positions[key]
923
all_build_index_memos[key] = index_memo
924
if compression_parent not in all_build_index_memos:
925
next_keys.add(compression_parent)
926
build_keys = next_keys
927
return sum(index_memo[2]
928
for index_memo in viewvalues(all_build_index_memos))
931
class KnitVersionedFiles(VersionedFilesWithFallbacks):
932
"""Storage for many versioned files using knit compression.
934
Backend storage is managed by indices and data objects.
936
:ivar _index: A _KnitGraphIndex or similar that can describe the
937
parents, graph, compression and data location of entries in this
938
KnitVersionedFiles. Note that this is only the index for
939
*this* vfs; if there are fallbacks they must be queried separately.
942
def __init__(self, index, data_access, max_delta_chain=200,
943
annotated=False, reload_func=None):
944
"""Create a KnitVersionedFiles with index and data_access.
946
:param index: The index for the knit data.
947
:param data_access: The access object to store and retrieve knit
949
:param max_delta_chain: The maximum number of deltas to permit during
950
insertion. Set to 0 to prohibit the use of deltas.
951
:param annotated: Set to True to cause annotations to be calculated and
952
stored during insertion.
953
:param reload_func: An function that can be called if we think we need
954
to reload the pack listing and try again. See
955
'breezy.bzr.pack_repo.AggregateIndex' for the signature.
958
self._access = data_access
959
self._max_delta_chain = max_delta_chain
961
self._factory = KnitAnnotateFactory()
963
self._factory = KnitPlainFactory()
964
self._immediate_fallback_vfs = []
965
self._reload_func = reload_func
968
return "%s(%r, %r)" % (
969
self.__class__.__name__,
973
def without_fallbacks(self):
974
"""Return a clone of this object without any fallbacks configured."""
975
return KnitVersionedFiles(self._index, self._access,
976
self._max_delta_chain, self._factory.annotated,
979
def add_fallback_versioned_files(self, a_versioned_files):
980
"""Add a source of texts for texts not present in this knit.
982
:param a_versioned_files: A VersionedFiles object.
984
self._immediate_fallback_vfs.append(a_versioned_files)
986
def add_lines(self, key, parents, lines, parent_texts=None,
987
left_matching_blocks=None, nostore_sha=None, random_id=False,
989
"""See VersionedFiles.add_lines()."""
990
self._index._check_write_ok()
991
self._check_add(key, lines, random_id, check_content)
993
# The caller might pass None if there is no graph data, but kndx
994
# indexes can't directly store that, so we give them
995
# an empty tuple instead.
997
line_bytes = b''.join(lines)
998
return self._add(key, lines, parents,
999
parent_texts, left_matching_blocks, nostore_sha, random_id,
1000
line_bytes=line_bytes)
1002
def _add(self, key, lines, parents, parent_texts,
1003
left_matching_blocks, nostore_sha, random_id,
1005
"""Add a set of lines on top of version specified by parents.
1007
Any versions not present will be converted into ghosts.
1009
:param lines: A list of strings where each one is a single line (has a
1010
single newline at the end of the string) This is now optional
1011
(callers can pass None). It is left in its location for backwards
1012
compatibility. It should ''.join(lines) must == line_bytes
1013
:param line_bytes: A single string containing the content
1015
We pass both lines and line_bytes because different routes bring the
1016
values to this function. And for memory efficiency, we don't want to
1017
have to split/join on-demand.
1019
# first thing, if the content is something we don't need to store, find
1021
digest = sha_string(line_bytes)
1022
if nostore_sha == digest:
1023
raise errors.ExistingContent
1025
present_parents = []
1026
if parent_texts is None:
1028
# Do a single query to ascertain parent presence; we only compress
1029
# against parents in the same kvf.
1030
present_parent_map = self._index.get_parent_map(parents)
1031
for parent in parents:
1032
if parent in present_parent_map:
1033
present_parents.append(parent)
1035
# Currently we can only compress against the left most present parent.
1036
if (len(present_parents) == 0
1037
or present_parents[0] != parents[0]):
1040
# To speed the extract of texts the delta chain is limited
1041
# to a fixed number of deltas. This should minimize both
1042
# I/O and the time spend applying deltas.
1043
delta = self._check_should_delta(present_parents[0])
1045
text_length = len(line_bytes)
1048
# Note: line_bytes is not modified to add a newline, that is tracked
1049
# via the no_eol flag. 'lines' *is* modified, because that is the
1050
# general values needed by the Content code.
1051
if line_bytes and not line_bytes.endswith(b'\n'):
1052
options.append(b'no-eol')
1054
# Copy the existing list, or create a new one
1056
lines = osutils.split_lines(line_bytes)
1059
# Replace the last line with one that ends in a final newline
1060
lines[-1] = lines[-1] + b'\n'
1062
lines = osutils.split_lines(line_bytes)
1064
for element in key[:-1]:
1065
if not isinstance(element, bytes):
1066
raise TypeError("key contains non-bytestrings: %r" % (key,))
1068
key = key[:-1] + (b'sha1:' + digest,)
1069
elif not isinstance(key[-1], bytes):
1070
raise TypeError("key contains non-bytestrings: %r" % (key,))
1071
# Knit hunks are still last-element only
1072
version_id = key[-1]
1073
content = self._factory.make(lines, version_id)
1075
# Hint to the content object that its text() call should strip the
1077
content._should_strip_eol = True
1078
if delta or (self._factory.annotated and len(present_parents) > 0):
1079
# Merge annotations from parent texts if needed.
1080
delta_hunks = self._merge_annotations(content, present_parents,
1081
parent_texts, delta, self._factory.annotated,
1082
left_matching_blocks)
1085
options.append(b'line-delta')
1086
store_lines = self._factory.lower_line_delta(delta_hunks)
1087
size, data = self._record_to_data(key, digest,
1090
options.append(b'fulltext')
1091
# isinstance is slower and we have no hierarchy.
1092
if self._factory.__class__ is KnitPlainFactory:
1093
# Use the already joined bytes saving iteration time in
1095
dense_lines = [line_bytes]
1097
dense_lines.append(b'\n')
1098
size, data = self._record_to_data(key, digest,
1101
# get mixed annotation + content and feed it into the
1103
store_lines = self._factory.lower_fulltext(content)
1104
size, data = self._record_to_data(key, digest,
1107
access_memo = self._access.add_raw_records([(key, size)], data)[0]
1108
self._index.add_records(
1109
((key, options, access_memo, parents),),
1110
random_id=random_id)
1111
return digest, text_length, content
1113
def annotate(self, key):
1114
"""See VersionedFiles.annotate."""
1115
return self._factory.annotate(self, key)
1117
def get_annotator(self):
1118
return _KnitAnnotator(self)
1120
def check(self, progress_bar=None, keys=None):
1121
"""See VersionedFiles.check()."""
1123
return self._logical_check()
1125
# At the moment, check does not extra work over get_record_stream
1126
return self.get_record_stream(keys, 'unordered', True)
1128
def _logical_check(self):
1129
# This doesn't actually test extraction of everything, but that will
1130
# impact 'bzr check' substantially, and needs to be integrated with
1131
# care. However, it does check for the obvious problem of a delta with
1133
keys = self._index.keys()
1134
parent_map = self.get_parent_map(keys)
1136
if self._index.get_method(key) != 'fulltext':
1137
compression_parent = parent_map[key][0]
1138
if compression_parent not in parent_map:
1139
raise KnitCorrupt(self,
1140
"Missing basis parent %s for %s" % (
1141
compression_parent, key))
1142
for fallback_vfs in self._immediate_fallback_vfs:
1143
fallback_vfs.check()
1145
def _check_add(self, key, lines, random_id, check_content):
1146
"""check that version_id and lines are safe to add."""
1147
if not all(isinstance(x, bytes) or x is None for x in key):
1148
raise TypeError(key)
1149
version_id = key[-1]
1150
if version_id is not None:
1151
if contains_whitespace(version_id):
1152
raise InvalidRevisionId(version_id, self)
1153
self.check_not_reserved_id(version_id)
1154
# TODO: If random_id==False and the key is already present, we should
1155
# probably check that the existing content is identical to what is
1156
# being inserted, and otherwise raise an exception. This would make
1157
# the bundle code simpler.
1159
self._check_lines_not_unicode(lines)
1160
self._check_lines_are_lines(lines)
1162
def _check_header(self, key, line):
1163
rec = self._split_header(line)
1164
self._check_header_version(rec, key[-1])
1167
def _check_header_version(self, rec, version_id):
1168
"""Checks the header version on original format knit records.
1170
These have the last component of the key embedded in the record.
1172
if rec[1] != version_id:
1173
raise KnitCorrupt(self,
1174
'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
1176
def _check_should_delta(self, parent):
1177
"""Iterate back through the parent listing, looking for a fulltext.
1179
This is used when we want to decide whether to add a delta or a new
1180
fulltext. It searches for _max_delta_chain parents. When it finds a
1181
fulltext parent, it sees if the total size of the deltas leading up to
1182
it is large enough to indicate that we want a new full text anyway.
1184
Return True if we should create a new delta, False if we should use a
1188
fulltext_size = None
1189
for count in range(self._max_delta_chain):
1191
# Note that this only looks in the index of this particular
1192
# KnitVersionedFiles, not in the fallbacks. This ensures that
1193
# we won't store a delta spanning physical repository
1195
build_details = self._index.get_build_details([parent])
1196
parent_details = build_details[parent]
1197
except (RevisionNotPresent, KeyError) as e:
1198
# Some basis is not locally present: always fulltext
1200
index_memo, compression_parent, _, _ = parent_details
1201
_, _, size = index_memo
1202
if compression_parent is None:
1203
fulltext_size = size
1206
# We don't explicitly check for presence because this is in an
1207
# inner loop, and if it's missing it'll fail anyhow.
1208
parent = compression_parent
1210
# We couldn't find a fulltext, so we must create a new one
1212
# Simple heuristic - if the total I/O wold be greater as a delta than
1213
# the originally installed fulltext, we create a new fulltext.
1214
return fulltext_size > delta_size
1216
def _build_details_to_components(self, build_details):
1217
"""Convert a build_details tuple to a position tuple."""
1218
# record_details, access_memo, compression_parent
1219
return build_details[3], build_details[0], build_details[1]
1221
def _get_components_positions(self, keys, allow_missing=False):
1222
"""Produce a map of position data for the components of keys.
1224
This data is intended to be used for retrieving the knit records.
1226
A dict of key to (record_details, index_memo, next, parents) is
1229
* method is the way referenced data should be applied.
1230
* index_memo is the handle to pass to the data access to actually get
1232
* next is the build-parent of the version, or None for fulltexts.
1233
* parents is the version_ids of the parents of this version
1235
:param allow_missing: If True do not raise an error on a missing
1236
component, just ignore it.
1239
pending_components = keys
1240
while pending_components:
1241
build_details = self._index.get_build_details(pending_components)
1242
current_components = set(pending_components)
1243
pending_components = set()
1244
for key, details in viewitems(build_details):
1245
(index_memo, compression_parent, parents,
1246
record_details) = details
1247
if compression_parent is not None:
1248
pending_components.add(compression_parent)
1249
component_data[key] = self._build_details_to_components(
1251
missing = current_components.difference(build_details)
1252
if missing and not allow_missing:
1253
raise errors.RevisionNotPresent(missing.pop(), self)
1254
return component_data
1256
def _get_content(self, key, parent_texts={}):
1257
"""Returns a content object that makes up the specified
1259
cached_version = parent_texts.get(key, None)
1260
if cached_version is not None:
1261
# Ensure the cache dict is valid.
1262
if not self.get_parent_map([key]):
1263
raise RevisionNotPresent(key, self)
1264
return cached_version
1265
generator = _VFContentMapGenerator(self, [key])
1266
return generator._get_content(key)
1268
def get_parent_map(self, keys):
1269
"""Get a map of the graph parents of keys.
1271
:param keys: The keys to look up parents for.
1272
:return: A mapping from keys to parents. Absent keys are absent from
1275
return self._get_parent_map_with_sources(keys)[0]
1277
def _get_parent_map_with_sources(self, keys):
1278
"""Get a map of the parents of keys.
1280
:param keys: The keys to look up parents for.
1281
:return: A tuple. The first element is a mapping from keys to parents.
1282
Absent keys are absent from the mapping. The second element is a
1283
list with the locations each key was found in. The first element
1284
is the in-this-knit parents, the second the first fallback source,
1288
sources = [self._index] + self._immediate_fallback_vfs
1291
for source in sources:
1294
new_result = source.get_parent_map(missing)
1295
source_results.append(new_result)
1296
result.update(new_result)
1297
missing.difference_update(set(new_result))
1298
return result, source_results
1300
def _get_record_map(self, keys, allow_missing=False):
1301
"""Produce a dictionary of knit records.
1303
:return: {key:(record, record_details, digest, next)}
1305
* record: data returned from read_records (a KnitContentobject)
1306
* record_details: opaque information to pass to parse_record
1307
* digest: SHA1 digest of the full text after all steps are done
1308
* next: build-parent of the version, i.e. the leftmost ancestor.
1309
Will be None if the record is not a delta.
1311
:param keys: The keys to build a map for
1312
:param allow_missing: If some records are missing, rather than
1313
error, just return the data that could be generated.
1315
raw_map = self._get_record_map_unparsed(keys,
1316
allow_missing=allow_missing)
1317
return self._raw_map_to_record_map(raw_map)
1319
def _raw_map_to_record_map(self, raw_map):
1320
"""Parse the contents of _get_record_map_unparsed.
1322
:return: see _get_record_map.
1326
data, record_details, next = raw_map[key]
1327
content, digest = self._parse_record(key[-1], data)
1328
result[key] = content, record_details, digest, next
1331
def _get_record_map_unparsed(self, keys, allow_missing=False):
1332
"""Get the raw data for reconstructing keys without parsing it.
1334
:return: A dict suitable for parsing via _raw_map_to_record_map.
1335
key-> raw_bytes, (method, noeol), compression_parent
1337
# This retries the whole request if anything fails. Potentially we
1338
# could be a bit more selective. We could track the keys whose records
1339
# we have successfully found, and then only request the new records
1340
# from there. However, _get_components_positions grabs the whole build
1341
# chain, which means we'll likely try to grab the same records again
1342
# anyway. Also, can the build chains change as part of a pack
1343
# operation? We wouldn't want to end up with a broken chain.
1346
position_map = self._get_components_positions(keys,
1347
allow_missing=allow_missing)
1348
# key = component_id, r = record_details, i_m = index_memo,
1350
records = [(key, i_m) for key, (r, i_m, n)
1351
in viewitems(position_map)]
1352
# Sort by the index memo, so that we request records from the
1353
# same pack file together, and in forward-sorted order
1354
records.sort(key=operator.itemgetter(1))
1356
for key, data in self._read_records_iter_unchecked(records):
1357
(record_details, index_memo, next) = position_map[key]
1358
raw_record_map[key] = data, record_details, next
1359
return raw_record_map
1360
except errors.RetryWithNewPacks as e:
1361
self._access.reload_or_raise(e)
1364
def _split_by_prefix(cls, keys):
1365
"""For the given keys, split them up based on their prefix.
1367
To keep memory pressure somewhat under control, split the
1368
requests back into per-file-id requests, otherwise "bzr co"
1369
extracts the full tree into memory before writing it to disk.
1370
This should be revisited if _get_content_maps() can ever cross
1373
The keys for a given file_id are kept in the same relative order.
1374
Ordering between file_ids is not, though prefix_order will return the
1375
order that the key was first seen.
1377
:param keys: An iterable of key tuples
1378
:return: (split_map, prefix_order)
1379
split_map A dictionary mapping prefix => keys
1380
prefix_order The order that we saw the various prefixes
1382
split_by_prefix = {}
1390
if prefix in split_by_prefix:
1391
split_by_prefix[prefix].append(key)
1393
split_by_prefix[prefix] = [key]
1394
prefix_order.append(prefix)
1395
return split_by_prefix, prefix_order
1397
def _group_keys_for_io(self, keys, non_local_keys, positions,
1398
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1399
"""For the given keys, group them into 'best-sized' requests.
1401
The idea is to avoid making 1 request per file, but to never try to
1402
unpack an entire 1.5GB source tree in a single pass. Also when
1403
possible, we should try to group requests to the same pack file
1406
:return: list of (keys, non_local) tuples that indicate what keys
1407
should be fetched next.
1409
# TODO: Ideally we would group on 2 factors. We want to extract texts
1410
# from the same pack file together, and we want to extract all
1411
# the texts for a given build-chain together. Ultimately it
1412
# probably needs a better global view.
1413
total_keys = len(keys)
1414
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1415
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1417
cur_non_local = set()
1421
for prefix in prefix_order:
1422
keys = prefix_split_keys[prefix]
1423
non_local = prefix_split_non_local_keys.get(prefix, [])
1425
this_size = self._index._get_total_build_size(keys, positions)
1426
cur_size += this_size
1427
cur_keys.extend(keys)
1428
cur_non_local.update(non_local)
1429
if cur_size > _min_buffer_size:
1430
result.append((cur_keys, cur_non_local))
1431
sizes.append(cur_size)
1433
cur_non_local = set()
1436
result.append((cur_keys, cur_non_local))
1437
sizes.append(cur_size)
1440
def get_record_stream(self, keys, ordering, include_delta_closure):
1441
"""Get a stream of records for keys.
1443
:param keys: The keys to include.
1444
:param ordering: Either 'unordered' or 'topological'. A topologically
1445
sorted stream has compression parents strictly before their
1447
:param include_delta_closure: If True then the closure across any
1448
compression parents will be included (in the opaque data).
1449
:return: An iterator of ContentFactory objects, each of which is only
1450
valid until the iterator is advanced.
1452
# keys might be a generator
1456
if not self._index.has_graph:
1457
# Cannot sort when no graph has been stored.
1458
ordering = 'unordered'
1460
remaining_keys = keys
1463
keys = set(remaining_keys)
1464
for content_factory in self._get_remaining_record_stream(keys,
1465
ordering, include_delta_closure):
1466
remaining_keys.discard(content_factory.key)
1467
yield content_factory
1469
except errors.RetryWithNewPacks as e:
1470
self._access.reload_or_raise(e)
1472
def _get_remaining_record_stream(self, keys, ordering,
1473
include_delta_closure):
1474
"""This function is the 'retry' portion for get_record_stream."""
1475
if include_delta_closure:
1476
positions = self._get_components_positions(
1477
keys, allow_missing=True)
1479
build_details = self._index.get_build_details(keys)
1481
# (record_details, access_memo, compression_parent_key)
1482
positions = dict((key, self._build_details_to_components(details))
1483
for key, details in viewitems(build_details))
1484
absent_keys = keys.difference(set(positions))
1485
# There may be more absent keys : if we're missing the basis component
1486
# and are trying to include the delta closure.
1487
# XXX: We should not ever need to examine remote sources because we do
1488
# not permit deltas across versioned files boundaries.
1489
if include_delta_closure:
1490
needed_from_fallback = set()
1491
# Build up reconstructable_keys dict. key:True in this dict means
1492
# the key can be reconstructed.
1493
reconstructable_keys = {}
1497
chain = [key, positions[key][2]]
1499
needed_from_fallback.add(key)
1502
while chain[-1] is not None:
1503
if chain[-1] in reconstructable_keys:
1504
result = reconstructable_keys[chain[-1]]
1508
chain.append(positions[chain[-1]][2])
1510
# missing basis component
1511
needed_from_fallback.add(chain[-1])
1514
for chain_key in chain[:-1]:
1515
reconstructable_keys[chain_key] = result
1517
needed_from_fallback.add(key)
1518
# Double index lookups here : need a unified api ?
1519
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1520
if ordering in ('topological', 'groupcompress'):
1521
if ordering == 'topological':
1522
# Global topological sort
1523
present_keys = tsort.topo_sort(global_map)
1525
present_keys = sort_groupcompress(global_map)
1526
# Now group by source:
1528
current_source = None
1529
for key in present_keys:
1530
for parent_map in parent_maps:
1531
if key in parent_map:
1532
key_source = parent_map
1534
if current_source is not key_source:
1535
source_keys.append((key_source, []))
1536
current_source = key_source
1537
source_keys[-1][1].append(key)
1539
if ordering != 'unordered':
1540
raise AssertionError('valid values for ordering are:'
1541
' "unordered", "groupcompress" or "topological" not: %r'
1543
# Just group by source; remote sources first.
1546
for parent_map in reversed(parent_maps):
1547
source_keys.append((parent_map, []))
1548
for key in parent_map:
1549
present_keys.append(key)
1550
source_keys[-1][1].append(key)
1551
# We have been requested to return these records in an order that
1552
# suits us. So we ask the index to give us an optimally sorted
1554
for source, sub_keys in source_keys:
1555
if source is parent_maps[0]:
1556
# Only sort the keys for this VF
1557
self._index._sort_keys_by_io(sub_keys, positions)
1558
absent_keys = keys - set(global_map)
1559
for key in absent_keys:
1560
yield AbsentContentFactory(key)
1561
# restrict our view to the keys we can answer.
1562
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1563
# XXX: At that point we need to consider the impact of double reads by
1564
# utilising components multiple times.
1565
if include_delta_closure:
1566
# XXX: get_content_maps performs its own index queries; allow state
1568
non_local_keys = needed_from_fallback - absent_keys
1569
for keys, non_local_keys in self._group_keys_for_io(present_keys,
1572
generator = _VFContentMapGenerator(self, keys, non_local_keys,
1575
for record in generator.get_record_stream():
1578
for source, keys in source_keys:
1579
if source is parent_maps[0]:
1580
# this KnitVersionedFiles
1581
records = [(key, positions[key][1]) for key in keys]
1582
for key, raw_data in self._read_records_iter_unchecked(records):
1583
(record_details, index_memo, _) = positions[key]
1584
yield KnitContentFactory(key, global_map[key],
1585
record_details, None, raw_data, self._factory.annotated, None)
1587
vf = self._immediate_fallback_vfs[parent_maps.index(
1589
for record in vf.get_record_stream(keys, ordering,
1590
include_delta_closure):
1593
def get_sha1s(self, keys):
1594
"""See VersionedFiles.get_sha1s()."""
1596
record_map = self._get_record_map(missing, allow_missing=True)
1598
for key, details in viewitems(record_map):
1599
if key not in missing:
1601
# record entry 2 is the 'digest'.
1602
result[key] = details[2]
1603
missing.difference_update(set(result))
1604
for source in self._immediate_fallback_vfs:
1607
new_result = source.get_sha1s(missing)
1608
result.update(new_result)
1609
missing.difference_update(set(new_result))
1612
def insert_record_stream(self, stream):
1613
"""Insert a record stream into this container.
1615
:param stream: A stream of records to insert.
1617
:seealso VersionedFiles.get_record_stream:
1619
def get_adapter(adapter_key):
1621
return adapters[adapter_key]
1623
adapter_factory = adapter_registry.get(adapter_key)
1624
adapter = adapter_factory(self)
1625
adapters[adapter_key] = adapter
1628
if self._factory.annotated:
1629
# self is annotated, we need annotated knits to use directly.
1630
annotated = "annotated-"
1633
# self is not annotated, but we can strip annotations cheaply.
1635
convertibles = {"knit-annotated-ft-gz"}
1636
if self._max_delta_chain:
1637
delta_types.add("knit-annotated-delta-gz")
1638
convertibles.add("knit-annotated-delta-gz")
1639
# The set of types we can cheaply adapt without needing basis texts.
1640
native_types = set()
1641
if self._max_delta_chain:
1642
native_types.add("knit-%sdelta-gz" % annotated)
1643
delta_types.add("knit-%sdelta-gz" % annotated)
1644
native_types.add("knit-%sft-gz" % annotated)
1645
knit_types = native_types.union(convertibles)
1647
# Buffer all index entries that we can't add immediately because their
1648
# basis parent is missing. We don't buffer all because generating
1649
# annotations may require access to some of the new records. However we
1650
# can't generate annotations from new deltas until their basis parent
1651
# is present anyway, so we get away with not needing an index that
1652
# includes the new keys.
1654
# See <http://launchpad.net/bugs/300177> about ordering of compression
1655
# parents in the records - to be conservative, we insist that all
1656
# parents must be present to avoid expanding to a fulltext.
1658
# key = basis_parent, value = index entry to add
1659
buffered_index_entries = {}
1660
for record in stream:
1661
kind = record.storage_kind
1662
if kind.startswith('knit-') and kind.endswith('-gz'):
1663
# Check that the ID in the header of the raw knit bytes matches
1664
# the record metadata.
1665
raw_data = record._raw_record
1666
df, rec = self._parse_record_header(record.key, raw_data)
1669
parents = record.parents
1670
if record.storage_kind in delta_types:
1671
# TODO: eventually the record itself should track
1672
# compression_parent
1673
compression_parent = parents[0]
1675
compression_parent = None
1676
# Raise an error when a record is missing.
1677
if record.storage_kind == 'absent':
1678
raise RevisionNotPresent([record.key], self)
1679
elif ((record.storage_kind in knit_types) and
1680
(compression_parent is None or
1681
not self._immediate_fallback_vfs or
1682
compression_parent in self._index or
1683
compression_parent not in self)):
1684
# we can insert the knit record literally if either it has no
1685
# compression parent OR we already have its basis in this kvf
1686
# OR the basis is not present even in the fallbacks. In the
1687
# last case it will either turn up later in the stream and all
1688
# will be well, or it won't turn up at all and we'll raise an
1691
# TODO: self.__contains__ is somewhat redundant with
1692
# self._index.__contains__; we really want something that directly
1693
# asks if it's only present in the fallbacks. -- mbp 20081119
1694
if record.storage_kind not in native_types:
1696
adapter_key = (record.storage_kind, "knit-delta-gz")
1697
adapter = get_adapter(adapter_key)
1699
adapter_key = (record.storage_kind, "knit-ft-gz")
1700
adapter = get_adapter(adapter_key)
1701
bytes = adapter.get_bytes(record)
1703
# It's a knit record, it has a _raw_record field (even if
1704
# it was reconstituted from a network stream).
1705
bytes = record._raw_record
1706
options = [record._build_details[0].encode('ascii')]
1707
if record._build_details[1]:
1708
options.append(b'no-eol')
1709
# Just blat it across.
1710
# Note: This does end up adding data on duplicate keys. As
1711
# modern repositories use atomic insertions this should not
1712
# lead to excessive growth in the event of interrupted fetches.
1713
# 'knit' repositories may suffer excessive growth, but as a
1714
# deprecated format this is tolerable. It can be fixed if
1715
# needed by in the kndx index support raising on a duplicate
1716
# add with identical parents and options.
1717
access_memo = self._access.add_raw_records(
1718
[(record.key, len(bytes))], bytes)[0]
1719
index_entry = (record.key, options, access_memo, parents)
1720
if b'fulltext' not in options:
1721
# Not a fulltext, so we need to make sure the compression
1722
# parent will also be present.
1723
# Note that pack backed knits don't need to buffer here
1724
# because they buffer all writes to the transaction level,
1725
# but we don't expose that difference at the index level. If
1726
# the query here has sufficient cost to show up in
1727
# profiling we should do that.
1729
# They're required to be physically in this
1730
# KnitVersionedFiles, not in a fallback.
1731
if compression_parent not in self._index:
1732
pending = buffered_index_entries.setdefault(
1733
compression_parent, [])
1734
pending.append(index_entry)
1737
self._index.add_records([index_entry])
1738
elif record.storage_kind == 'chunked':
1739
self.add_lines(record.key, parents,
1740
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1742
# Not suitable for direct insertion as a
1743
# delta, either because it's not the right format, or this
1744
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1745
# 0) or because it depends on a base only present in the
1747
self._access.flush()
1749
# Try getting a fulltext directly from the record.
1750
bytes = record.get_bytes_as('fulltext')
1751
except errors.UnavailableRepresentation:
1752
adapter_key = record.storage_kind, 'fulltext'
1753
adapter = get_adapter(adapter_key)
1754
bytes = adapter.get_bytes(record)
1755
lines = split_lines(bytes)
1757
self.add_lines(record.key, parents, lines)
1758
except errors.RevisionAlreadyPresent:
1760
# Add any records whose basis parent is now available.
1762
added_keys = [record.key]
1764
key = added_keys.pop(0)
1765
if key in buffered_index_entries:
1766
index_entries = buffered_index_entries[key]
1767
self._index.add_records(index_entries)
1769
[index_entry[0] for index_entry in index_entries])
1770
del buffered_index_entries[key]
1771
if buffered_index_entries:
1772
# There were index entries buffered at the end of the stream,
1773
# So these need to be added (if the index supports holding such
1774
# entries for later insertion)
1776
for key in buffered_index_entries:
1777
index_entries = buffered_index_entries[key]
1778
all_entries.extend(index_entries)
1779
self._index.add_records(
1780
all_entries, missing_compression_parents=True)
1782
def get_missing_compression_parent_keys(self):
1783
"""Return an iterable of keys of missing compression parents.
1785
Check this after calling insert_record_stream to find out if there are
1786
any missing compression parents. If there are, the records that
1787
depend on them are not able to be inserted safely. For atomic
1788
KnitVersionedFiles built on packs, the transaction should be aborted or
1789
suspended - commit will fail at this point. Nonatomic knits will error
1790
earlier because they have no staging area to put pending entries into.
1792
return self._index.get_missing_compression_parents()
1794
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1795
"""Iterate over the lines in the versioned files from keys.
1797
This may return lines from other keys. Each item the returned
1798
iterator yields is a tuple of a line and a text version that that line
1799
is present in (not introduced in).
1801
Ordering of results is in whatever order is most suitable for the
1802
underlying storage format.
1804
If a progress bar is supplied, it may be used to indicate progress.
1805
The caller is responsible for cleaning up progress bars (because this
1809
* Lines are normalised by the underlying store: they will all have \\n
1811
* Lines are returned in arbitrary order.
1812
* If a requested key did not change any lines (or didn't have any
1813
lines), it may not be mentioned at all in the result.
1815
:param pb: Progress bar supplied by caller.
1816
:return: An iterator over (line, key).
1819
pb = ui.ui_factory.nested_progress_bar()
1825
# we don't care about inclusions, the caller cares.
1826
# but we need to setup a list of records to visit.
1827
# we need key, position, length
1829
build_details = self._index.get_build_details(keys)
1830
for key, details in viewitems(build_details):
1832
key_records.append((key, details[0]))
1833
records_iter = enumerate(self._read_records_iter(key_records))
1834
for (key_idx, (key, data, sha_value)) in records_iter:
1835
pb.update(gettext('Walking content'), key_idx, total)
1836
compression_parent = build_details[key][1]
1837
if compression_parent is None:
1839
line_iterator = self._factory.get_fulltext_content(
1843
line_iterator = self._factory.get_linedelta_content(
1845
# Now that we are yielding the data for this key, remove it
1848
# XXX: It might be more efficient to yield (key,
1849
# line_iterator) in the future. However for now, this is a
1850
# simpler change to integrate into the rest of the
1851
# codebase. RBC 20071110
1852
for line in line_iterator:
1855
except errors.RetryWithNewPacks as e:
1856
self._access.reload_or_raise(e)
1857
# If there are still keys we've not yet found, we look in the fallback
1858
# vfs, and hope to find them there. Note that if the keys are found
1859
# but had no changes or no content, the fallback may not return
1861
if keys and not self._immediate_fallback_vfs:
1862
# XXX: strictly the second parameter is meant to be the file id
1863
# but it's not easily accessible here.
1864
raise RevisionNotPresent(keys, repr(self))
1865
for source in self._immediate_fallback_vfs:
1869
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1870
source_keys.add(key)
1872
keys.difference_update(source_keys)
1873
pb.update(gettext('Walking content'), total, total)
1875
def _make_line_delta(self, delta_seq, new_content):
1876
"""Generate a line delta from delta_seq and new_content."""
1878
for op in delta_seq.get_opcodes():
1879
if op[0] == 'equal':
1882
(op[1], op[2], op[4] - op[3], new_content._lines[op[3]:op[4]]))
1885
def _merge_annotations(self, content, parents, parent_texts={},
1886
delta=None, annotated=None,
1887
left_matching_blocks=None):
1888
"""Merge annotations for content and generate deltas.
1890
This is done by comparing the annotations based on changes to the text
1891
and generating a delta on the resulting full texts. If annotations are
1892
not being created then a simple delta is created.
1894
if left_matching_blocks is not None:
1895
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1899
for parent_key in parents:
1900
merge_content = self._get_content(parent_key, parent_texts)
1901
if (parent_key == parents[0] and delta_seq is not None):
1904
seq = patiencediff.PatienceSequenceMatcher(
1905
None, merge_content.text(), content.text())
1906
for i, j, n in seq.get_matching_blocks():
1909
# this copies (origin, text) pairs across to the new
1910
# content for any line that matches the last-checked
1912
content._lines[j:j + n] = merge_content._lines[i:i + n]
1913
# XXX: Robert says the following block is a workaround for a
1914
# now-fixed bug and it can probably be deleted. -- mbp 20080618
1915
if content._lines and not content._lines[-1][1].endswith(b'\n'):
1916
# The copied annotation was from a line without a trailing EOL,
1917
# reinstate one for the content object, to ensure correct
1919
line = content._lines[-1][1] + b'\n'
1920
content._lines[-1] = (content._lines[-1][0], line)
1922
if delta_seq is None:
1923
reference_content = self._get_content(parents[0], parent_texts)
1924
new_texts = content.text()
1925
old_texts = reference_content.text()
1926
delta_seq = patiencediff.PatienceSequenceMatcher(
1927
None, old_texts, new_texts)
1928
return self._make_line_delta(delta_seq, content)
1930
def _parse_record(self, version_id, data):
1931
"""Parse an original format knit record.
1933
These have the last element of the key only present in the stored data.
1935
rec, record_contents = self._parse_record_unchecked(data)
1936
self._check_header_version(rec, version_id)
1937
return record_contents, rec[3]
1939
def _parse_record_header(self, key, raw_data):
1940
"""Parse a record header for consistency.
1942
:return: the header and the decompressor stream.
1943
as (stream, header_record)
1945
df = gzip.GzipFile(mode='rb', fileobj=BytesIO(raw_data))
1948
rec = self._check_header(key, df.readline())
1949
except Exception as e:
1950
raise KnitCorrupt(self,
1951
"While reading {%s} got %s(%s)"
1952
% (key, e.__class__.__name__, str(e)))
1955
def _parse_record_unchecked(self, data):
1957
# 4168 calls in 2880 217 internal
1958
# 4168 calls to _parse_record_header in 2121
1959
# 4168 calls to readlines in 330
1960
with gzip.GzipFile(mode='rb', fileobj=BytesIO(data)) as df:
1962
record_contents = df.readlines()
1963
except Exception as e:
1964
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1965
(data, e.__class__.__name__, str(e)))
1966
header = record_contents.pop(0)
1967
rec = self._split_header(header)
1968
last_line = record_contents.pop()
1969
if len(record_contents) != int(rec[2]):
1970
raise KnitCorrupt(self,
1971
'incorrect number of lines %s != %s'
1972
' for version {%s} %s'
1973
% (len(record_contents), int(rec[2]),
1974
rec[1], record_contents))
1975
if last_line != b'end %s\n' % rec[1]:
1976
raise KnitCorrupt(self,
1977
'unexpected version end line %r, wanted %r'
1978
% (last_line, rec[1]))
1979
return rec, record_contents
1981
def _read_records_iter(self, records):
1982
"""Read text records from data file and yield result.
1984
The result will be returned in whatever is the fastest to read.
1985
Not by the order requested. Also, multiple requests for the same
1986
record will only yield 1 response.
1988
:param records: A list of (key, access_memo) entries
1989
:return: Yields (key, contents, digest) in the order
1990
read, not the order requested
1995
# XXX: This smells wrong, IO may not be getting ordered right.
1996
needed_records = sorted(set(records), key=operator.itemgetter(1))
1997
if not needed_records:
2000
# The transport optimizes the fetching as well
2001
# (ie, reads continuous ranges.)
2002
raw_data = self._access.get_raw_records(
2003
[index_memo for key, index_memo in needed_records])
2005
for (key, index_memo), data in zip(needed_records, raw_data):
2006
content, digest = self._parse_record(key[-1], data)
2007
yield key, content, digest
2009
def _read_records_iter_raw(self, records):
2010
"""Read text records from data file and yield raw data.
2012
This unpacks enough of the text record to validate the id is
2013
as expected but thats all.
2015
Each item the iterator yields is (key, bytes,
2016
expected_sha1_of_full_text).
2018
for key, data in self._read_records_iter_unchecked(records):
2019
# validate the header (note that we can only use the suffix in
2020
# current knit records).
2021
df, rec = self._parse_record_header(key, data)
2023
yield key, data, rec[3]
2025
def _read_records_iter_unchecked(self, records):
2026
"""Read text records from data file and yield raw data.
2028
No validation is done.
2030
Yields tuples of (key, data).
2032
# setup an iterator of the external records:
2033
# uses readv so nice and fast we hope.
2035
# grab the disk data needed.
2036
needed_offsets = [index_memo for key, index_memo
2038
raw_records = self._access.get_raw_records(needed_offsets)
2040
for key, index_memo in records:
2041
data = next(raw_records)
2044
def _record_to_data(self, key, digest, lines, dense_lines=None):
2045
"""Convert key, digest, lines into a raw data block.
2047
:param key: The key of the record. Currently keys are always serialised
2048
using just the trailing component.
2049
:param dense_lines: The bytes of lines but in a denser form. For
2050
instance, if lines is a list of 1000 bytestrings each ending in
2051
\\n, dense_lines may be a list with one line in it, containing all
2052
the 1000's lines and their \\n's. Using dense_lines if it is
2053
already known is a win because the string join to create bytes in
2054
this function spends less time resizing the final string.
2055
:return: (len, a BytesIO instance with the raw data ready to read.)
2057
chunks = [b"version %s %d %s\n" % (key[-1], len(lines), digest)]
2058
chunks.extend(dense_lines or lines)
2059
chunks.append(b"end " + key[-1] + b"\n")
2060
for chunk in chunks:
2061
if not isinstance(chunk, bytes):
2062
raise AssertionError(
2063
'data must be plain bytes was %s' % type(chunk))
2064
if lines and not lines[-1].endswith(b'\n'):
2065
raise ValueError('corrupt lines value %r' % lines)
2066
compressed_bytes = b''.join(tuned_gzip.chunks_to_gzip(chunks))
2067
return len(compressed_bytes), compressed_bytes
2069
def _split_header(self, line):
2072
raise KnitCorrupt(self,
2073
'unexpected number of elements in record header')
2077
"""See VersionedFiles.keys."""
2078
if 'evil' in debug.debug_flags:
2079
trace.mutter_callsite(2, "keys scales with size of history")
2080
sources = [self._index] + self._immediate_fallback_vfs
2082
for source in sources:
2083
result.update(source.keys())
2087
class _ContentMapGenerator(object):
2088
"""Generate texts or expose raw deltas for a set of texts."""
2090
def __init__(self, ordering='unordered'):
2091
self._ordering = ordering
2093
def _get_content(self, key):
2094
"""Get the content object for key."""
2095
# Note that _get_content is only called when the _ContentMapGenerator
2096
# has been constructed with just one key requested for reconstruction.
2097
if key in self.nonlocal_keys:
2098
record = next(self.get_record_stream())
2099
# Create a content object on the fly
2100
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2101
return PlainKnitContent(lines, record.key)
2103
# local keys we can ask for directly
2104
return self._get_one_work(key)
2106
def get_record_stream(self):
2107
"""Get a record stream for the keys requested during __init__."""
2108
for record in self._work():
2112
"""Produce maps of text and KnitContents as dicts.
2114
:return: (text_map, content_map) where text_map contains the texts for
2115
the requested versions and content_map contains the KnitContents.
2117
# NB: By definition we never need to read remote sources unless texts
2118
# are requested from them: we don't delta across stores - and we
2119
# explicitly do not want to to prevent data loss situations.
2120
if self.global_map is None:
2121
self.global_map = self.vf.get_parent_map(self.keys)
2122
nonlocal_keys = self.nonlocal_keys
2124
missing_keys = set(nonlocal_keys)
2125
# Read from remote versioned file instances and provide to our caller.
2126
for source in self.vf._immediate_fallback_vfs:
2127
if not missing_keys:
2129
# Loop over fallback repositories asking them for texts - ignore
2130
# any missing from a particular fallback.
2131
for record in source.get_record_stream(missing_keys,
2132
self._ordering, True):
2133
if record.storage_kind == 'absent':
2134
# Not in thie particular stream, may be in one of the
2135
# other fallback vfs objects.
2137
missing_keys.remove(record.key)
2140
if self._raw_record_map is None:
2141
raise AssertionError('_raw_record_map should have been filled')
2143
for key in self.keys:
2144
if key in self.nonlocal_keys:
2146
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2149
def _get_one_work(self, requested_key):
2150
# Now, if we have calculated everything already, just return the
2152
if requested_key in self._contents_map:
2153
return self._contents_map[requested_key]
2154
# To simplify things, parse everything at once - code that wants one text
2155
# probably wants them all.
2156
# FUTURE: This function could be improved for the 'extract many' case
2157
# by tracking each component and only doing the copy when the number of
2158
# children than need to apply delta's to it is > 1 or it is part of the
2160
multiple_versions = len(self.keys) != 1
2161
if self._record_map is None:
2162
self._record_map = self.vf._raw_map_to_record_map(
2163
self._raw_record_map)
2164
record_map = self._record_map
2165
# raw_record_map is key:
2166
# Have read and parsed records at this point.
2167
for key in self.keys:
2168
if key in self.nonlocal_keys:
2173
while cursor is not None:
2175
record, record_details, digest, next = record_map[cursor]
2177
raise RevisionNotPresent(cursor, self)
2178
components.append((cursor, record, record_details, digest))
2180
if cursor in self._contents_map:
2181
# no need to plan further back
2182
components.append((cursor, None, None, None))
2186
for (component_id, record, record_details,
2187
digest) in reversed(components):
2188
if component_id in self._contents_map:
2189
content = self._contents_map[component_id]
2191
content, delta = self._factory.parse_record(key[-1],
2192
record, record_details, content,
2193
copy_base_content=multiple_versions)
2194
if multiple_versions:
2195
self._contents_map[component_id] = content
2197
# digest here is the digest from the last applied component.
2198
text = content.text()
2199
actual_sha = sha_strings(text)
2200
if actual_sha != digest:
2201
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2202
if multiple_versions:
2203
return self._contents_map[requested_key]
2207
def _wire_bytes(self):
2208
"""Get the bytes to put on the wire for 'key'.
2210
The first collection of bytes asked for returns the serialised
2211
raw_record_map and the additional details (key, parent) for key.
2212
Subsequent calls return just the additional details (key, parent).
2213
The wire storage_kind given for the first key is 'knit-delta-closure',
2214
For subsequent keys it is 'knit-delta-closure-ref'.
2216
:param key: A key from the content generator.
2217
:return: Bytes to put on the wire.
2220
# kind marker for dispatch on the far side,
2221
lines.append(b'knit-delta-closure')
2223
if self.vf._factory.annotated:
2224
lines.append(b'annotated')
2227
# then the list of keys
2228
lines.append(b'\t'.join(b'\x00'.join(key) for key in self.keys
2229
if key not in self.nonlocal_keys))
2230
# then the _raw_record_map in serialised form:
2232
# for each item in the map:
2234
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2235
# one line with method
2236
# one line with noeol
2237
# one line with next ('' for None)
2238
# one line with byte count of the record bytes
2240
for key, (record_bytes, (method, noeol), next) in viewitems(
2241
self._raw_record_map):
2242
key_bytes = b'\x00'.join(key)
2243
parents = self.global_map.get(key, None)
2245
parent_bytes = b'None:'
2247
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
2248
method_bytes = method.encode('ascii')
2254
next_bytes = b'\x00'.join(next)
2257
map_byte_list.append(b'\n'.join(
2258
[key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2259
b'%d' % len(record_bytes), record_bytes]))
2260
map_bytes = b''.join(map_byte_list)
2261
lines.append(map_bytes)
2262
bytes = b'\n'.join(lines)
2266
class _VFContentMapGenerator(_ContentMapGenerator):
2267
"""Content map generator reading from a VersionedFiles object."""
2269
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2270
global_map=None, raw_record_map=None, ordering='unordered'):
2271
"""Create a _ContentMapGenerator.
2273
:param versioned_files: The versioned files that the texts are being
2275
:param keys: The keys to produce content maps for.
2276
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2277
which are known to not be in this knit, but rather in one of the
2279
:param global_map: The result of get_parent_map(keys) (or a supermap).
2280
This is required if get_record_stream() is to be used.
2281
:param raw_record_map: A unparsed raw record map to use for answering
2284
_ContentMapGenerator.__init__(self, ordering=ordering)
2285
# The vf to source data from
2286
self.vf = versioned_files
2288
self.keys = list(keys)
2289
# Keys known to be in fallback vfs objects
2290
if nonlocal_keys is None:
2291
self.nonlocal_keys = set()
2293
self.nonlocal_keys = frozenset(nonlocal_keys)
2294
# Parents data for keys to be returned in get_record_stream
2295
self.global_map = global_map
2296
# The chunked lists for self.keys in text form
2298
# A cache of KnitContent objects used in extracting texts.
2299
self._contents_map = {}
2300
# All the knit records needed to assemble the requested keys as full
2302
self._record_map = None
2303
if raw_record_map is None:
2304
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2307
self._raw_record_map = raw_record_map
2308
# the factory for parsing records
2309
self._factory = self.vf._factory
2312
class _NetworkContentMapGenerator(_ContentMapGenerator):
2313
"""Content map generator sourced from a network stream."""
2315
def __init__(self, bytes, line_end):
2316
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2318
self.global_map = {}
2319
self._raw_record_map = {}
2320
self._contents_map = {}
2321
self._record_map = None
2322
self.nonlocal_keys = []
2323
# Get access to record parsing facilities
2324
self.vf = KnitVersionedFiles(None, None)
2327
line_end = bytes.find(b'\n', start)
2328
line = bytes[start:line_end]
2329
start = line_end + 1
2330
if line == b'annotated':
2331
self._factory = KnitAnnotateFactory()
2333
self._factory = KnitPlainFactory()
2334
# list of keys to emit in get_record_stream
2335
line_end = bytes.find(b'\n', start)
2336
line = bytes[start:line_end]
2337
start = line_end + 1
2339
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2341
# now a loop until the end. XXX: It would be nice if this was just a
2342
# bunch of the same records as get_record_stream(..., False) gives, but
2343
# there is a decent sized gap stopping that at the moment.
2347
line_end = bytes.find(b'\n', start)
2348
key = tuple(bytes[start:line_end].split(b'\x00'))
2349
start = line_end + 1
2350
# 1 line with parents (None: for None, '' for ())
2351
line_end = bytes.find(b'\n', start)
2352
line = bytes[start:line_end]
2353
if line == b'None:':
2357
tuple(segment.split(b'\x00')) for segment in line.split(b'\t')
2359
self.global_map[key] = parents
2360
start = line_end + 1
2361
# one line with method
2362
line_end = bytes.find(b'\n', start)
2363
line = bytes[start:line_end]
2364
method = line.decode('ascii')
2365
start = line_end + 1
2366
# one line with noeol
2367
line_end = bytes.find(b'\n', start)
2368
line = bytes[start:line_end]
2369
noeol = line == b"T"
2370
start = line_end + 1
2371
# one line with next (b'' for None)
2372
line_end = bytes.find(b'\n', start)
2373
line = bytes[start:line_end]
2377
next = tuple(bytes[start:line_end].split(b'\x00'))
2378
start = line_end + 1
2379
# one line with byte count of the record bytes
2380
line_end = bytes.find(b'\n', start)
2381
line = bytes[start:line_end]
2383
start = line_end + 1
2385
record_bytes = bytes[start:start + count]
2386
start = start + count
2388
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2390
def get_record_stream(self):
2391
"""Get a record stream for for keys requested by the bytestream."""
2393
for key in self.keys:
2394
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2397
def _wire_bytes(self):
2401
class _KndxIndex(object):
2402
"""Manages knit index files
2404
The index is kept in memory and read on startup, to enable
2405
fast lookups of revision information. The cursor of the index
2406
file is always pointing to the end, making it easy to append
2409
_cache is a cache for fast mapping from version id to a Index
2412
_history is a cache for fast mapping from indexes to version ids.
2414
The index data format is dictionary compressed when it comes to
2415
parent references; a index entry may only have parents that with a
2416
lover index number. As a result, the index is topological sorted.
2418
Duplicate entries may be written to the index for a single version id
2419
if this is done then the latter one completely replaces the former:
2420
this allows updates to correct version and parent information.
2421
Note that the two entries may share the delta, and that successive
2422
annotations and references MUST point to the first entry.
2424
The index file on disc contains a header, followed by one line per knit
2425
record. The same revision can be present in an index file more than once.
2426
The first occurrence gets assigned a sequence number starting from 0.
2428
The format of a single line is
2429
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2430
REVISION_ID is a utf8-encoded revision id
2431
FLAGS is a comma separated list of flags about the record. Values include
2432
no-eol, line-delta, fulltext.
2433
BYTE_OFFSET is the ascii representation of the byte offset in the data file
2434
that the compressed data starts at.
2435
LENGTH is the ascii representation of the length of the data file.
2436
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
2438
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
2439
revision id already in the knit that is a parent of REVISION_ID.
2440
The ' :' marker is the end of record marker.
2443
when a write is interrupted to the index file, it will result in a line
2444
that does not end in ' :'. If the ' :' is not present at the end of a line,
2445
or at the end of the file, then the record that is missing it will be
2446
ignored by the parser.
2448
When writing new records to the index file, the data is preceded by '\n'
2449
to ensure that records always start on new lines even if the last write was
2450
interrupted. As a result its normal for the last line in the index to be
2451
missing a trailing newline. One can be added with no harmful effects.
2453
:ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
2454
where prefix is e.g. the (fileid,) for .texts instances or () for
2455
constant-mapped things like .revisions, and the old state is
2456
tuple(history_vector, cache_dict). This is used to prevent having an
2457
ABI change with the C extension that reads .kndx files.
2460
HEADER = b"# bzr knit index 8\n"
2462
def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
2463
"""Create a _KndxIndex on transport using mapper."""
2464
self._transport = transport
2465
self._mapper = mapper
2466
self._get_scope = get_scope
2467
self._allow_writes = allow_writes
2468
self._is_locked = is_locked
2470
self.has_graph = True
2472
def add_records(self, records, random_id=False, missing_compression_parents=False):
2473
"""Add multiple records to the index.
2475
:param records: a list of tuples:
2476
(key, options, access_memo, parents).
2477
:param random_id: If True the ids being added were randomly generated
2478
and no check for existence will be performed.
2479
:param missing_compression_parents: If True the records being added are
2480
only compressed against texts already in the index (or inside
2481
records). If False the records all refer to unavailable texts (or
2482
texts inside records) as compression parents.
2484
if missing_compression_parents:
2485
# It might be nice to get the edge of the records. But keys isn't
2487
keys = sorted(record[0] for record in records)
2488
raise errors.RevisionNotPresent(keys, self)
2490
for record in records:
2493
path = self._mapper.map(key) + '.kndx'
2494
path_keys = paths.setdefault(path, (prefix, []))
2495
path_keys[1].append(record)
2496
for path in sorted(paths):
2497
prefix, path_keys = paths[path]
2498
self._load_prefixes([prefix])
2500
orig_history = self._kndx_cache[prefix][1][:]
2501
orig_cache = self._kndx_cache[prefix][0].copy()
2504
for key, options, (_, pos, size), parents in path_keys:
2505
if not all(isinstance(option, bytes) for option in options):
2506
raise TypeError(options)
2508
# kndx indices cannot be parentless.
2512
+ key[-1], b','.join(options), b'%d' % pos, b'%d' % size,
2513
self._dictionary_compress(parents), b':'])
2514
if not isinstance(line, bytes):
2515
raise AssertionError(
2516
'data must be utf8 was %s' % type(line))
2518
self._cache_key(key, options, pos, size, parents)
2519
if len(orig_history):
2520
self._transport.append_bytes(path, b''.join(lines))
2522
self._init_index(path, lines)
2524
# If any problems happen, restore the original values and re-raise
2525
self._kndx_cache[prefix] = (orig_cache, orig_history)
2528
def scan_unvalidated_index(self, graph_index):
2529
"""See _KnitGraphIndex.scan_unvalidated_index."""
2530
# Because kndx files do not support atomic insertion via separate index
2531
# files, they do not support this method.
2532
raise NotImplementedError(self.scan_unvalidated_index)
2534
def get_missing_compression_parents(self):
2535
"""See _KnitGraphIndex.get_missing_compression_parents."""
2536
# Because kndx files do not support atomic insertion via separate index
2537
# files, they do not support this method.
2538
raise NotImplementedError(self.get_missing_compression_parents)
2540
def _cache_key(self, key, options, pos, size, parent_keys):
2541
"""Cache a version record in the history array and index cache.
2543
This is inlined into _load_data for performance. KEEP IN SYNC.
2544
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
2548
version_id = key[-1]
2549
# last-element only for compatibilty with the C load_data.
2550
parents = tuple(parent[-1] for parent in parent_keys)
2551
for parent in parent_keys:
2552
if parent[:-1] != prefix:
2553
raise ValueError("mismatched prefixes for %r, %r" % (
2555
cache, history = self._kndx_cache[prefix]
2556
# only want the _history index to reference the 1st index entry
2558
if version_id not in cache:
2559
index = len(history)
2560
history.append(version_id)
2562
index = cache[version_id][5]
2563
cache[version_id] = (version_id,
2570
def check_header(self, fp):
2571
line = fp.readline()
2573
# An empty file can actually be treated as though the file doesn't
2575
raise errors.NoSuchFile(self)
2576
if line != self.HEADER:
2577
raise KnitHeaderError(badline=line, filename=self)
2579
def _check_read(self):
2580
if not self._is_locked():
2581
raise errors.ObjectNotLocked(self)
2582
if self._get_scope() != self._scope:
2585
def _check_write_ok(self):
2586
"""Assert if not writes are permitted."""
2587
if not self._is_locked():
2588
raise errors.ObjectNotLocked(self)
2589
if self._get_scope() != self._scope:
2591
if self._mode != 'w':
2592
raise errors.ReadOnlyObjectDirtiedError(self)
2594
def get_build_details(self, keys):
2595
"""Get the method, index_memo and compression parent for keys.
2597
Ghosts are omitted from the result.
2599
:param keys: An iterable of keys.
2600
:return: A dict of key:(index_memo, compression_parent, parents,
2603
opaque structure to pass to read_records to extract the raw
2606
Content that this record is built upon, may be None
2608
Logical parents of this node
2610
extra information about the content which needs to be passed to
2611
Factory.parse_record
2613
parent_map = self.get_parent_map(keys)
2616
if key not in parent_map:
2618
method = self.get_method(key)
2619
if not isinstance(method, str):
2620
raise TypeError(method)
2621
parents = parent_map[key]
2622
if method == 'fulltext':
2623
compression_parent = None
2625
compression_parent = parents[0]
2626
noeol = b'no-eol' in self.get_options(key)
2627
index_memo = self.get_position(key)
2628
result[key] = (index_memo, compression_parent,
2629
parents, (method, noeol))
2632
def get_method(self, key):
2633
"""Return compression method of specified key."""
2634
options = self.get_options(key)
2635
if b'fulltext' in options:
2637
elif b'line-delta' in options:
2640
raise KnitIndexUnknownMethod(self, options)
2642
def get_options(self, key):
2643
"""Return a list representing options.
2647
prefix, suffix = self._split_key(key)
2648
self._load_prefixes([prefix])
2650
return self._kndx_cache[prefix][0][suffix][1]
2652
raise RevisionNotPresent(key, self)
2654
def find_ancestry(self, keys):
2655
"""See CombinedGraphIndex.find_ancestry()"""
2656
prefixes = set(key[:-1] for key in keys)
2657
self._load_prefixes(prefixes)
2660
missing_keys = set()
2661
pending_keys = list(keys)
2662
# This assumes that keys will not reference parents in a different
2663
# prefix, which is accurate so far.
2665
key = pending_keys.pop()
2666
if key in parent_map:
2670
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2672
missing_keys.add(key)
2674
parent_keys = tuple([prefix + (suffix,)
2675
for suffix in suffix_parents])
2676
parent_map[key] = parent_keys
2677
pending_keys.extend([p for p in parent_keys
2678
if p not in parent_map])
2679
return parent_map, missing_keys
2681
def get_parent_map(self, keys):
2682
"""Get a map of the parents of keys.
2684
:param keys: The keys to look up parents for.
2685
:return: A mapping from keys to parents. Absent keys are absent from
2688
# Parse what we need to up front, this potentially trades off I/O
2689
# locality (.kndx and .knit in the same block group for the same file
2690
# id) for less checking in inner loops.
2691
prefixes = set(key[:-1] for key in keys)
2692
self._load_prefixes(prefixes)
2697
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2701
result[key] = tuple(prefix + (suffix,) for
2702
suffix in suffix_parents)
2705
def get_position(self, key):
2706
"""Return details needed to access the version.
2708
:return: a tuple (key, data position, size) to hand to the access
2709
logic to get the record.
2711
prefix, suffix = self._split_key(key)
2712
self._load_prefixes([prefix])
2713
entry = self._kndx_cache[prefix][0][suffix]
2714
return key, entry[2], entry[3]
2716
__contains__ = _mod_index._has_key_from_parent_map
2718
def _init_index(self, path, extra_lines=[]):
2719
"""Initialize an index."""
2721
sio.write(self.HEADER)
2722
sio.writelines(extra_lines)
2724
self._transport.put_file_non_atomic(path, sio,
2725
create_parent_dir=True)
2726
# self._create_parent_dir)
2727
# mode=self._file_mode,
2728
# dir_mode=self._dir_mode)
2731
"""Get all the keys in the collection.
2733
The keys are not ordered.
2736
# Identify all key prefixes.
2737
# XXX: A bit hacky, needs polish.
2738
if isinstance(self._mapper, ConstantMapper):
2742
for quoted_relpath in self._transport.iter_files_recursive():
2743
path, ext = os.path.splitext(quoted_relpath)
2745
prefixes = [self._mapper.unmap(path) for path in relpaths]
2746
self._load_prefixes(prefixes)
2747
for prefix in prefixes:
2748
for suffix in self._kndx_cache[prefix][1]:
2749
result.add(prefix + (suffix,))
2752
def _load_prefixes(self, prefixes):
2753
"""Load the indices for prefixes."""
2755
for prefix in prefixes:
2756
if prefix not in self._kndx_cache:
2757
# the load_data interface writes to these variables.
2760
self._filename = prefix
2762
path = self._mapper.map(prefix) + '.kndx'
2763
with self._transport.get(path) as fp:
2764
# _load_data may raise NoSuchFile if the target knit is
2766
_load_data(self, fp)
2767
self._kndx_cache[prefix] = (self._cache, self._history)
2772
self._kndx_cache[prefix] = ({}, [])
2773
if isinstance(self._mapper, ConstantMapper):
2774
# preserve behaviour for revisions.kndx etc.
2775
self._init_index(path)
2780
missing_keys = _mod_index._missing_keys_from_parent_map
2782
def _partition_keys(self, keys):
2783
"""Turn keys into a dict of prefix:suffix_list."""
2786
prefix_keys = result.setdefault(key[:-1], [])
2787
prefix_keys.append(key[-1])
2790
def _dictionary_compress(self, keys):
2791
"""Dictionary compress keys.
2793
:param keys: The keys to generate references to.
2794
:return: A string representation of keys. keys which are present are
2795
dictionary compressed, and others are emitted as fulltext with a
2801
prefix = keys[0][:-1]
2802
cache = self._kndx_cache[prefix][0]
2804
if key[:-1] != prefix:
2805
# kndx indices cannot refer across partitioned storage.
2806
raise ValueError("mismatched prefixes for %r" % keys)
2807
if key[-1] in cache:
2808
# -- inlined lookup() --
2809
result_list.append(b'%d' % cache[key[-1]][5])
2810
# -- end lookup () --
2812
result_list.append(b'.' + key[-1])
2813
return b' '.join(result_list)
2815
def _reset_cache(self):
2816
# Possibly this should be a LRU cache. A dictionary from key_prefix to
2817
# (cache_dict, history_vector) for parsed kndx files.
2818
self._kndx_cache = {}
2819
self._scope = self._get_scope()
2820
allow_writes = self._allow_writes()
2826
def _sort_keys_by_io(self, keys, positions):
2827
"""Figure out an optimal order to read the records for the given keys.
2829
Sort keys, grouped by index and sorted by position.
2831
:param keys: A list of keys whose records we want to read. This will be
2833
:param positions: A dict, such as the one returned by
2834
_get_components_positions()
2837
def get_sort_key(key):
2838
index_memo = positions[key][1]
2839
# Group by prefix and position. index_memo[0] is the key, so it is
2840
# (file_id, revision_id) and we don't want to sort on revision_id,
2841
# index_memo[1] is the position, and index_memo[2] is the size,
2842
# which doesn't matter for the sort
2843
return index_memo[0][:-1], index_memo[1]
2844
return keys.sort(key=get_sort_key)
2846
_get_total_build_size = _get_total_build_size
2848
def _split_key(self, key):
2849
"""Split key into a prefix and suffix."""
2850
# GZ 2018-07-03: This is intentionally either a sequence or bytes?
2851
if isinstance(key, bytes):
2852
return key[:-1], key[-1:]
2853
return key[:-1], key[-1]
2856
class _KnitGraphIndex(object):
2857
"""A KnitVersionedFiles index layered on GraphIndex."""
2859
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2860
add_callback=None, track_external_parent_refs=False):
2861
"""Construct a KnitGraphIndex on a graph_index.
2863
:param graph_index: An implementation of breezy.index.GraphIndex.
2864
:param is_locked: A callback to check whether the object should answer
2866
:param deltas: Allow delta-compressed records.
2867
:param parents: If True, record knits parents, if not do not record
2869
:param add_callback: If not None, allow additions to the index and call
2870
this callback with a list of added GraphIndex nodes:
2871
[(node, value, node_refs), ...]
2872
:param is_locked: A callback, returns True if the index is locked and
2874
:param track_external_parent_refs: If True, record all external parent
2875
references parents from added records. These can be retrieved
2876
later by calling get_missing_parents().
2878
self._add_callback = add_callback
2879
self._graph_index = graph_index
2880
self._deltas = deltas
2881
self._parents = parents
2882
if deltas and not parents:
2883
# XXX: TODO: Delta tree and parent graph should be conceptually
2885
raise KnitCorrupt(self, "Cannot do delta compression without "
2887
self.has_graph = parents
2888
self._is_locked = is_locked
2889
self._missing_compression_parents = set()
2890
if track_external_parent_refs:
2891
self._key_dependencies = _KeyRefs()
2893
self._key_dependencies = None
2896
return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2898
def add_records(self, records, random_id=False,
2899
missing_compression_parents=False):
2900
"""Add multiple records to the index.
2902
This function does not insert data into the Immutable GraphIndex
2903
backing the KnitGraphIndex, instead it prepares data for insertion by
2904
the caller and checks that it is safe to insert then calls
2905
self._add_callback with the prepared GraphIndex nodes.
2907
:param records: a list of tuples:
2908
(key, options, access_memo, parents).
2909
:param random_id: If True the ids being added were randomly generated
2910
and no check for existence will be performed.
2911
:param missing_compression_parents: If True the records being added are
2912
only compressed against texts already in the index (or inside
2913
records). If False the records all refer to unavailable texts (or
2914
texts inside records) as compression parents.
2916
if not self._add_callback:
2917
raise errors.ReadOnlyError(self)
2918
# we hope there are no repositories with inconsistent parentage
2922
compression_parents = set()
2923
key_dependencies = self._key_dependencies
2924
for (key, options, access_memo, parents) in records:
2926
parents = tuple(parents)
2927
if key_dependencies is not None:
2928
key_dependencies.add_references(key, parents)
2929
index, pos, size = access_memo
2930
if b'no-eol' in options:
2934
value += b"%d %d" % (pos, size)
2935
if not self._deltas:
2936
if b'line-delta' in options:
2938
self, "attempt to add line-delta in non-delta knit")
2941
if b'line-delta' in options:
2942
node_refs = (parents, (parents[0],))
2943
if missing_compression_parents:
2944
compression_parents.add(parents[0])
2946
node_refs = (parents, ())
2948
node_refs = (parents, )
2951
raise KnitCorrupt(self, "attempt to add node with parents "
2952
"in parentless index.")
2954
keys[key] = (value, node_refs)
2957
present_nodes = self._get_entries(keys)
2958
for (index, key, value, node_refs) in present_nodes:
2959
parents = node_refs[:1]
2960
# Sometimes these are passed as a list rather than a tuple
2961
passed = static_tuple.as_tuples(keys[key])
2962
passed_parents = passed[1][:1]
2963
if (value[0:1] != keys[key][0][0:1]
2964
or parents != passed_parents):
2965
node_refs = static_tuple.as_tuples(node_refs)
2966
raise KnitCorrupt(self, "inconsistent details in add_records"
2967
": %s %s" % ((value, node_refs), passed))
2971
for key, (value, node_refs) in viewitems(keys):
2972
result.append((key, value, node_refs))
2974
for key, (value, node_refs) in viewitems(keys):
2975
result.append((key, value))
2976
self._add_callback(result)
2977
if missing_compression_parents:
2978
# This may appear to be incorrect (it does not check for
2979
# compression parents that are in the existing graph index),
2980
# but such records won't have been buffered, so this is
2981
# actually correct: every entry when
2982
# missing_compression_parents==True either has a missing parent, or
2983
# a parent that is one of the keys in records.
2984
compression_parents.difference_update(keys)
2985
self._missing_compression_parents.update(compression_parents)
2986
# Adding records may have satisfied missing compression parents.
2987
self._missing_compression_parents.difference_update(keys)
2989
def scan_unvalidated_index(self, graph_index):
2990
"""Inform this _KnitGraphIndex that there is an unvalidated index.
2992
This allows this _KnitGraphIndex to keep track of any missing
2993
compression parents we may want to have filled in to make those
2996
:param graph_index: A GraphIndex
2999
new_missing = graph_index.external_references(ref_list_num=1)
3000
new_missing.difference_update(self.get_parent_map(new_missing))
3001
self._missing_compression_parents.update(new_missing)
3002
if self._key_dependencies is not None:
3003
# Add parent refs from graph_index (and discard parent refs that
3004
# the graph_index has).
3005
for node in graph_index.iter_all_entries():
3006
self._key_dependencies.add_references(node[1], node[3][0])
3008
def get_missing_compression_parents(self):
3009
"""Return the keys of missing compression parents.
3011
Missing compression parents occur when a record stream was missing
3012
basis texts, or a index was scanned that had missing basis texts.
3014
return frozenset(self._missing_compression_parents)
3016
def get_missing_parents(self):
3017
"""Return the keys of missing parents."""
3018
# If updating this, you should also update
3019
# groupcompress._GCGraphIndex.get_missing_parents
3020
# We may have false positives, so filter those out.
3021
self._key_dependencies.satisfy_refs_for_keys(
3022
self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
3023
return frozenset(self._key_dependencies.get_unsatisfied_refs())
3025
def _check_read(self):
3026
"""raise if reads are not permitted."""
3027
if not self._is_locked():
3028
raise errors.ObjectNotLocked(self)
3030
def _check_write_ok(self):
3031
"""Assert if writes are not permitted."""
3032
if not self._is_locked():
3033
raise errors.ObjectNotLocked(self)
3035
def _compression_parent(self, an_entry):
3036
# return the key that an_entry is compressed against, or None
3037
# Grab the second parent list (as deltas implies parents currently)
3038
compression_parents = an_entry[3][1]
3039
if not compression_parents:
3041
if len(compression_parents) != 1:
3042
raise AssertionError(
3043
"Too many compression parents: %r" % compression_parents)
3044
return compression_parents[0]
3046
def get_build_details(self, keys):
3047
"""Get the method, index_memo and compression parent for version_ids.
3049
Ghosts are omitted from the result.
3051
:param keys: An iterable of keys.
3052
:return: A dict of key:
3053
(index_memo, compression_parent, parents, record_details).
3055
opaque structure to pass to read_records to extract the raw
3058
Content that this record is built upon, may be None
3060
Logical parents of this node
3062
extra information about the content which needs to be passed to
3063
Factory.parse_record
3067
entries = self._get_entries(keys, False)
3068
for entry in entries:
3070
if not self._parents:
3073
parents = entry[3][0]
3074
if not self._deltas:
3075
compression_parent_key = None
3077
compression_parent_key = self._compression_parent(entry)
3078
noeol = (entry[2][0:1] == b'N')
3079
if compression_parent_key:
3080
method = 'line-delta'
3083
result[key] = (self._node_to_position(entry),
3084
compression_parent_key, parents,
3088
def _get_entries(self, keys, check_present=False):
3089
"""Get the entries for keys.
3091
:param keys: An iterable of index key tuples.
3096
for node in self._graph_index.iter_entries(keys):
3098
found_keys.add(node[1])
3100
# adapt parentless index to the rest of the code.
3101
for node in self._graph_index.iter_entries(keys):
3102
yield node[0], node[1], node[2], ()
3103
found_keys.add(node[1])
3105
missing_keys = keys.difference(found_keys)
3107
raise RevisionNotPresent(missing_keys.pop(), self)
3109
def get_method(self, key):
3110
"""Return compression method of specified key."""
3111
return self._get_method(self._get_node(key))
3113
def _get_method(self, node):
3114
if not self._deltas:
3116
if self._compression_parent(node):
3121
def _get_node(self, key):
3123
return list(self._get_entries([key]))[0]
3125
raise RevisionNotPresent(key, self)
3127
def get_options(self, key):
3128
"""Return a list representing options.
3132
node = self._get_node(key)
3133
options = [self._get_method(node).encode('ascii')]
3134
if node[2][0:1] == b'N':
3135
options.append(b'no-eol')
3138
def find_ancestry(self, keys):
3139
"""See CombinedGraphIndex.find_ancestry()"""
3140
return self._graph_index.find_ancestry(keys, 0)
3142
def get_parent_map(self, keys):
3143
"""Get a map of the parents of keys.
3145
:param keys: The keys to look up parents for.
3146
:return: A mapping from keys to parents. Absent keys are absent from
3150
nodes = self._get_entries(keys)
3154
result[node[1]] = node[3][0]
3157
result[node[1]] = None
3160
def get_position(self, key):
3161
"""Return details needed to access the version.
3163
:return: a tuple (index, data position, size) to hand to the access
3164
logic to get the record.
3166
node = self._get_node(key)
3167
return self._node_to_position(node)
3169
__contains__ = _mod_index._has_key_from_parent_map
3172
"""Get all the keys in the collection.
3174
The keys are not ordered.
3177
return [node[1] for node in self._graph_index.iter_all_entries()]
3179
missing_keys = _mod_index._missing_keys_from_parent_map
3181
def _node_to_position(self, node):
3182
"""Convert an index value to position details."""
3183
bits = node[2][1:].split(b' ')
3184
return node[0], int(bits[0]), int(bits[1])
3186
def _sort_keys_by_io(self, keys, positions):
3187
"""Figure out an optimal order to read the records for the given keys.
3189
Sort keys, grouped by index and sorted by position.
3191
:param keys: A list of keys whose records we want to read. This will be
3193
:param positions: A dict, such as the one returned by
3194
_get_components_positions()
3197
def get_index_memo(key):
3198
# index_memo is at offset [1]. It is made up of (GraphIndex,
3199
# position, size). GI is an object, which will be unique for each
3200
# pack file. This causes us to group by pack file, then sort by
3201
# position. Size doesn't matter, but it isn't worth breaking up the
3203
return positions[key][1]
3204
return keys.sort(key=get_index_memo)
3206
_get_total_build_size = _get_total_build_size
3209
class _KnitKeyAccess(object):
3210
"""Access to records in .knit files."""
3212
def __init__(self, transport, mapper):
3213
"""Create a _KnitKeyAccess with transport and mapper.
3215
:param transport: The transport the access object is rooted at.
3216
:param mapper: The mapper used to map keys to .knit files.
3218
self._transport = transport
3219
self._mapper = mapper
3221
def add_raw_records(self, key_sizes, raw_data):
3222
"""Add raw knit bytes to a storage area.
3224
The data is spooled to the container writer in one bytes-record per
3227
:param sizes: An iterable of tuples containing the key and size of each
3229
:param raw_data: A bytestring containing the data.
3230
:return: A list of memos to retrieve the record later. Each memo is an
3231
opaque index memo. For _KnitKeyAccess the memo is (key, pos,
3232
length), where the key is the record key.
3234
if not isinstance(raw_data, bytes):
3235
raise AssertionError(
3236
'data must be plain bytes was %s' % type(raw_data))
3239
# TODO: This can be tuned for writing to sftp and other servers where
3240
# append() is relatively expensive by grouping the writes to each key
3242
for key, size in key_sizes:
3243
path = self._mapper.map(key)
3245
base = self._transport.append_bytes(path + '.knit',
3246
raw_data[offset:offset + size])
3247
except errors.NoSuchFile:
3248
self._transport.mkdir(osutils.dirname(path))
3249
base = self._transport.append_bytes(path + '.knit',
3250
raw_data[offset:offset + size])
3254
result.append((key, base, size))
3258
"""Flush pending writes on this access object.
3260
For .knit files this is a no-op.
3264
def get_raw_records(self, memos_for_retrieval):
3265
"""Get the raw bytes for a records.
3267
:param memos_for_retrieval: An iterable containing the access memo for
3268
retrieving the bytes.
3269
:return: An iterator over the bytes of the records.
3271
# first pass, group into same-index request to minimise readv's issued.
3273
current_prefix = None
3274
for (key, offset, length) in memos_for_retrieval:
3275
if current_prefix == key[:-1]:
3276
current_list.append((offset, length))
3278
if current_prefix is not None:
3279
request_lists.append((current_prefix, current_list))
3280
current_prefix = key[:-1]
3281
current_list = [(offset, length)]
3282
# handle the last entry
3283
if current_prefix is not None:
3284
request_lists.append((current_prefix, current_list))
3285
for prefix, read_vector in request_lists:
3286
path = self._mapper.map(prefix) + '.knit'
3287
for pos, data in self._transport.readv(path, read_vector):
3291
def annotate_knit(knit, revision_id):
3292
"""Annotate a knit with no cached annotations.
3294
This implementation is for knits with no cached annotations.
3295
It will work for knits with cached annotations, but this is not
3298
annotator = _KnitAnnotator(knit)
3299
return iter(annotator.annotate_flat(revision_id))
3302
class _KnitAnnotator(annotate.Annotator):
3303
"""Build up the annotations for a text."""
3305
def __init__(self, vf):
3306
annotate.Annotator.__init__(self, vf)
3308
# TODO: handle Nodes which cannot be extracted
3309
# self._ghosts = set()
3311
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3312
self._matching_blocks = {}
3314
# KnitContent objects
3315
self._content_objects = {}
3316
# The number of children that depend on this fulltext content object
3317
self._num_compression_children = {}
3318
# Delta records that need their compression parent before they can be
3320
self._pending_deltas = {}
3321
# Fulltext records that are waiting for their parents fulltexts before
3322
# they can be yielded for annotation
3323
self._pending_annotation = {}
3325
self._all_build_details = {}
3327
def _get_build_graph(self, key):
3328
"""Get the graphs for building texts and annotations.
3330
The data you need for creating a full text may be different than the
3331
data you need to annotate that text. (At a minimum, you need both
3332
parents to create an annotation, but only need 1 parent to generate the
3335
:return: A list of (key, index_memo) records, suitable for
3336
passing to read_records_iter to start reading in the raw data from
3342
self._num_needed_children[key] = 1
3344
# get all pending nodes
3345
this_iteration = pending
3346
build_details = self._vf._index.get_build_details(this_iteration)
3347
self._all_build_details.update(build_details)
3348
# new_nodes = self._vf._index._get_entries(this_iteration)
3350
for key, details in viewitems(build_details):
3351
(index_memo, compression_parent, parent_keys,
3352
record_details) = details
3353
self._parent_map[key] = parent_keys
3354
self._heads_provider = None
3355
records.append((key, index_memo))
3356
# Do we actually need to check _annotated_lines?
3357
pending.update([p for p in parent_keys
3358
if p not in self._all_build_details])
3360
for parent_key in parent_keys:
3361
if parent_key in self._num_needed_children:
3362
self._num_needed_children[parent_key] += 1
3364
self._num_needed_children[parent_key] = 1
3365
if compression_parent:
3366
if compression_parent in self._num_compression_children:
3367
self._num_compression_children[compression_parent] += 1
3369
self._num_compression_children[compression_parent] = 1
3371
missing_versions = this_iteration.difference(build_details)
3372
if missing_versions:
3373
for key in missing_versions:
3374
if key in self._parent_map and key in self._text_cache:
3375
# We already have this text ready, we just need to
3376
# yield it later so we get it annotated
3378
parent_keys = self._parent_map[key]
3379
for parent_key in parent_keys:
3380
if parent_key in self._num_needed_children:
3381
self._num_needed_children[parent_key] += 1
3383
self._num_needed_children[parent_key] = 1
3384
pending.update([p for p in parent_keys
3385
if p not in self._all_build_details])
3387
raise errors.RevisionNotPresent(key, self._vf)
3388
# Generally we will want to read the records in reverse order, because
3389
# we find the parent nodes after the children
3391
return records, ann_keys
3393
def _get_needed_texts(self, key, pb=None):
3394
# if True or len(self._vf._immediate_fallback_vfs) > 0:
3395
if len(self._vf._immediate_fallback_vfs) > 0:
3396
# If we have fallbacks, go to the generic path
3397
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3402
records, ann_keys = self._get_build_graph(key)
3403
for idx, (sub_key, text, num_lines) in enumerate(
3404
self._extract_texts(records)):
3406
pb.update(gettext('annotating'), idx, len(records))
3407
yield sub_key, text, num_lines
3408
for sub_key in ann_keys:
3409
text = self._text_cache[sub_key]
3410
num_lines = len(text) # bad assumption
3411
yield sub_key, text, num_lines
3413
except errors.RetryWithNewPacks as e:
3414
self._vf._access.reload_or_raise(e)
3415
# The cached build_details are no longer valid
3416
self._all_build_details.clear()
3418
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3419
parent_lines = self._text_cache[compression_parent]
3420
blocks = list(KnitContent.get_line_delta_blocks(
3421
delta, parent_lines, lines))
3422
self._matching_blocks[(key, compression_parent)] = blocks
3424
def _expand_record(self, key, parent_keys, compression_parent, record,
3427
if compression_parent:
3428
if compression_parent not in self._content_objects:
3429
# Waiting for the parent
3430
self._pending_deltas.setdefault(compression_parent, []).append(
3431
(key, parent_keys, record, record_details))
3433
# We have the basis parent, so expand the delta
3434
num = self._num_compression_children[compression_parent]
3437
base_content = self._content_objects.pop(compression_parent)
3438
self._num_compression_children.pop(compression_parent)
3440
self._num_compression_children[compression_parent] = num
3441
base_content = self._content_objects[compression_parent]
3442
# It is tempting to want to copy_base_content=False for the last
3443
# child object. However, whenever noeol=False,
3444
# self._text_cache[parent_key] is content._lines. So mutating it
3445
# gives very bad results.
3446
# The alternative is to copy the lines into text cache, but then we
3447
# are copying anyway, so just do it here.
3448
content, delta = self._vf._factory.parse_record(
3449
key, record, record_details, base_content,
3450
copy_base_content=True)
3453
content, _ = self._vf._factory.parse_record(
3454
key, record, record_details, None)
3455
if self._num_compression_children.get(key, 0) > 0:
3456
self._content_objects[key] = content
3457
lines = content.text()
3458
self._text_cache[key] = lines
3459
if delta is not None:
3460
self._cache_delta_blocks(key, compression_parent, delta, lines)
3463
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3464
"""Get the list of annotations for the parent, and the matching lines.
3466
:param text: The opaque value given by _get_needed_texts
3467
:param parent_key: The key for the parent text
3468
:return: (parent_annotations, matching_blocks)
3469
parent_annotations is a list as long as the number of lines in
3471
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3472
indicating which lines match between the two texts
3474
block_key = (key, parent_key)
3475
if block_key in self._matching_blocks:
3476
blocks = self._matching_blocks.pop(block_key)
3477
parent_annotations = self._annotations_cache[parent_key]
3478
return parent_annotations, blocks
3479
return annotate.Annotator._get_parent_annotations_and_matches(self,
3480
key, text, parent_key)
3482
def _process_pending(self, key):
3483
"""The content for 'key' was just processed.
3485
Determine if there is any more pending work to be processed.
3488
if key in self._pending_deltas:
3489
compression_parent = key
3490
children = self._pending_deltas.pop(key)
3491
for child_key, parent_keys, record, record_details in children:
3492
lines = self._expand_record(child_key, parent_keys,
3494
record, record_details)
3495
if self._check_ready_for_annotations(child_key, parent_keys):
3496
to_return.append(child_key)
3497
# Also check any children that are waiting for this parent to be
3499
if key in self._pending_annotation:
3500
children = self._pending_annotation.pop(key)
3501
to_return.extend([c for c, p_keys in children
3502
if self._check_ready_for_annotations(c, p_keys)])
3505
def _check_ready_for_annotations(self, key, parent_keys):
3506
"""return true if this text is ready to be yielded.
3508
Otherwise, this will return False, and queue the text into
3509
self._pending_annotation
3511
for parent_key in parent_keys:
3512
if parent_key not in self._annotations_cache:
3513
# still waiting on at least one parent text, so queue it up
3514
# Note that if there are multiple parents, we need to wait
3516
self._pending_annotation.setdefault(parent_key,
3517
[]).append((key, parent_keys))
3521
def _extract_texts(self, records):
3522
"""Extract the various texts needed based on records"""
3523
# We iterate in the order read, rather than a strict order requested
3524
# However, process what we can, and put off to the side things that
3525
# still need parents, cleaning them up when those parents are
3528
# 1) As 'records' are read, see if we can expand these records into
3529
# Content objects (and thus lines)
3530
# 2) If a given line-delta is waiting on its compression parent, it
3531
# gets queued up into self._pending_deltas, otherwise we expand
3532
# it, and put it into self._text_cache and self._content_objects
3533
# 3) If we expanded the text, we will then check to see if all
3534
# parents have also been processed. If so, this text gets yielded,
3535
# else this record gets set aside into pending_annotation
3536
# 4) Further, if we expanded the text in (2), we will then check to
3537
# see if there are any children in self._pending_deltas waiting to
3538
# also be processed. If so, we go back to (2) for those
3539
# 5) Further again, if we yielded the text, we can then check if that
3540
# 'unlocks' any of the texts in pending_annotations, which should
3541
# then get yielded as well
3542
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3543
# compression child could unlock yet another, and yielding a fulltext
3544
# will also 'unlock' the children that are waiting on that annotation.
3545
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3546
# if other parents are also waiting.)
3547
# We want to yield content before expanding child content objects, so
3548
# that we know when we can re-use the content lines, and the annotation
3549
# code can know when it can stop caching fulltexts, as well.
3551
# Children that are missing their compression parent
3553
for (key, record, digest) in self._vf._read_records_iter(records):
3555
details = self._all_build_details[key]
3556
(_, compression_parent, parent_keys, record_details) = details
3557
lines = self._expand_record(key, parent_keys, compression_parent,
3558
record, record_details)
3560
# Pending delta should be queued up
3562
# At this point, we may be able to yield this content, if all
3563
# parents are also finished
3564
yield_this_text = self._check_ready_for_annotations(key,
3567
# All parents present
3568
yield key, lines, len(lines)
3569
to_process = self._process_pending(key)
3571
this_process = to_process
3573
for key in this_process:
3574
lines = self._text_cache[key]
3575
yield key, lines, len(lines)
3576
to_process.extend(self._process_pending(key))
3580
from ._knit_load_data_pyx import _load_data_c as _load_data
3581
except ImportError as e:
3582
osutils.failed_to_load_extension(e)
3583
from ._knit_load_data_py import _load_data_py as _load_data