1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
63
from cStringIO import StringIO
64
from itertools import izip, chain
68
from bzrlib.lazy_import import lazy_import
69
lazy_import(globals(), """
89
from bzrlib.errors import (
97
RevisionAlreadyPresent,
100
from bzrlib.osutils import (
107
from bzrlib.versionedfile import (
108
AbsentContentFactory,
112
FulltextContentFactory,
118
# TODO: Split out code specific to this format into an associated object.
120
# TODO: Can we put in some kind of value to check that the index and data
121
# files belong together?
123
# TODO: accommodate binaries, perhaps by storing a byte count
125
# TODO: function to check whole file
127
# TODO: atomically append data, then measure backwards from the cursor
128
# position after writing to work out where it was located. we may need to
129
# bypass python file buffering.
131
DATA_SUFFIX = '.knit'
132
INDEX_SUFFIX = '.kndx'
135
class KnitAdapter(object):
136
"""Base class for knit record adaption."""
138
def __init__(self, basis_vf):
139
"""Create an adapter which accesses full texts from basis_vf.
141
:param basis_vf: A versioned file to access basis texts of deltas from.
142
May be None for adapters that do not need to access basis texts.
144
self._data = KnitVersionedFiles(None, None)
145
self._annotate_factory = KnitAnnotateFactory()
146
self._plain_factory = KnitPlainFactory()
147
self._basis_vf = basis_vf
150
class FTAnnotatedToUnannotated(KnitAdapter):
151
"""An adapter from FT annotated knits to unannotated ones."""
153
def get_bytes(self, factory, annotated_compressed_bytes):
155
self._data._parse_record_unchecked(annotated_compressed_bytes)
156
content = self._annotate_factory.parse_fulltext(contents, rec[1])
157
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
161
class DeltaAnnotatedToUnannotated(KnitAdapter):
162
"""An adapter for deltas from annotated to unannotated."""
164
def get_bytes(self, factory, annotated_compressed_bytes):
166
self._data._parse_record_unchecked(annotated_compressed_bytes)
167
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
169
contents = self._plain_factory.lower_line_delta(delta)
170
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
174
class FTAnnotatedToFullText(KnitAdapter):
175
"""An adapter from FT annotated knits to unannotated ones."""
177
def get_bytes(self, factory, annotated_compressed_bytes):
179
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
content, delta = self._annotate_factory.parse_record(factory.key[-1],
181
contents, factory._build_details, None)
182
return ''.join(content.text())
185
class DeltaAnnotatedToFullText(KnitAdapter):
186
"""An adapter for deltas from annotated to unannotated."""
188
def get_bytes(self, factory, annotated_compressed_bytes):
190
self._data._parse_record_unchecked(annotated_compressed_bytes)
191
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
193
compression_parent = factory.parents[0]
194
basis_entry = self._basis_vf.get_record_stream(
195
[compression_parent], 'unordered', True).next()
196
if basis_entry.storage_kind == 'absent':
197
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
198
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
199
# Manually apply the delta because we have one annotated content and
201
basis_content = PlainKnitContent(basis_lines, compression_parent)
202
basis_content.apply_delta(delta, rec[1])
203
basis_content._should_strip_eol = factory._build_details[1]
204
return ''.join(basis_content.text())
207
class FTPlainToFullText(KnitAdapter):
208
"""An adapter from FT plain knits to unannotated ones."""
210
def get_bytes(self, factory, compressed_bytes):
212
self._data._parse_record_unchecked(compressed_bytes)
213
content, delta = self._plain_factory.parse_record(factory.key[-1],
214
contents, factory._build_details, None)
215
return ''.join(content.text())
218
class DeltaPlainToFullText(KnitAdapter):
219
"""An adapter for deltas from annotated to unannotated."""
221
def get_bytes(self, factory, compressed_bytes):
223
self._data._parse_record_unchecked(compressed_bytes)
224
delta = self._plain_factory.parse_line_delta(contents, rec[1])
225
compression_parent = factory.parents[0]
226
# XXX: string splitting overhead.
227
basis_entry = self._basis_vf.get_record_stream(
228
[compression_parent], 'unordered', True).next()
229
if basis_entry.storage_kind == 'absent':
230
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
231
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
232
basis_content = PlainKnitContent(basis_lines, compression_parent)
233
# Manually apply the delta because we have one annotated content and
235
content, _ = self._plain_factory.parse_record(rec[1], contents,
236
factory._build_details, basis_content)
237
return ''.join(content.text())
240
class KnitContentFactory(ContentFactory):
241
"""Content factory for streaming from knits.
243
:seealso ContentFactory:
246
def __init__(self, key, parents, build_details, sha1, raw_record,
247
annotated, knit=None):
248
"""Create a KnitContentFactory for key.
251
:param parents: The parents.
252
:param build_details: The build details as returned from
254
:param sha1: The sha1 expected from the full text of this object.
255
:param raw_record: The bytes of the knit data from disk.
256
:param annotated: True if the raw data is annotated.
258
ContentFactory.__init__(self)
261
self.parents = parents
262
if build_details[0] == 'line-delta':
267
annotated_kind = 'annotated-'
270
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
271
self._raw_record = raw_record
272
self._build_details = build_details
275
def get_bytes_as(self, storage_kind):
276
if storage_kind == self.storage_kind:
277
return self._raw_record
278
if storage_kind == 'fulltext' and self._knit is not None:
279
return self._knit.get_text(self.key[0])
281
raise errors.UnavailableRepresentation(self.key, storage_kind,
285
class KnitContent(object):
286
"""Content of a knit version to which deltas can be applied.
288
This is always stored in memory as a list of lines with \n at the end,
289
plus a flag saying if the final ending is really there or not, because that
290
corresponds to the on-disk knit representation.
294
self._should_strip_eol = False
296
def apply_delta(self, delta, new_version_id):
297
"""Apply delta to this object to become new_version_id."""
298
raise NotImplementedError(self.apply_delta)
300
def line_delta_iter(self, new_lines):
301
"""Generate line-based delta from this content to new_lines."""
302
new_texts = new_lines.text()
303
old_texts = self.text()
304
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
305
for tag, i1, i2, j1, j2 in s.get_opcodes():
308
# ofrom, oto, length, data
309
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
311
def line_delta(self, new_lines):
312
return list(self.line_delta_iter(new_lines))
315
def get_line_delta_blocks(knit_delta, source, target):
316
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
317
target_len = len(target)
320
for s_begin, s_end, t_len, new_text in knit_delta:
321
true_n = s_begin - s_pos
324
# knit deltas do not provide reliable info about whether the
325
# last line of a file matches, due to eol handling.
326
if source[s_pos + n -1] != target[t_pos + n -1]:
329
yield s_pos, t_pos, n
330
t_pos += t_len + true_n
332
n = target_len - t_pos
334
if source[s_pos + n -1] != target[t_pos + n -1]:
337
yield s_pos, t_pos, n
338
yield s_pos + (target_len - t_pos), target_len, 0
341
class AnnotatedKnitContent(KnitContent):
342
"""Annotated content."""
344
def __init__(self, lines):
345
KnitContent.__init__(self)
349
"""Return a list of (origin, text) for each content line."""
350
lines = self._lines[:]
351
if self._should_strip_eol:
352
origin, last_line = lines[-1]
353
lines[-1] = (origin, last_line.rstrip('\n'))
356
def apply_delta(self, delta, new_version_id):
357
"""Apply delta to this object to become new_version_id."""
360
for start, end, count, delta_lines in delta:
361
lines[offset+start:offset+end] = delta_lines
362
offset = offset + (start - end) + count
366
lines = [text for origin, text in self._lines]
367
except ValueError, e:
368
# most commonly (only?) caused by the internal form of the knit
369
# missing annotation information because of a bug - see thread
371
raise KnitCorrupt(self,
372
"line in annotated knit missing annotation information: %s"
374
if self._should_strip_eol:
375
lines[-1] = lines[-1].rstrip('\n')
379
return AnnotatedKnitContent(self._lines[:])
382
class PlainKnitContent(KnitContent):
383
"""Unannotated content.
385
When annotate[_iter] is called on this content, the same version is reported
386
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
390
def __init__(self, lines, version_id):
391
KnitContent.__init__(self)
393
self._version_id = version_id
396
"""Return a list of (origin, text) for each content line."""
397
return [(self._version_id, line) for line in self._lines]
399
def apply_delta(self, delta, new_version_id):
400
"""Apply delta to this object to become new_version_id."""
403
for start, end, count, delta_lines in delta:
404
lines[offset+start:offset+end] = delta_lines
405
offset = offset + (start - end) + count
406
self._version_id = new_version_id
409
return PlainKnitContent(self._lines[:], self._version_id)
413
if self._should_strip_eol:
415
lines[-1] = lines[-1].rstrip('\n')
419
class _KnitFactory(object):
420
"""Base class for common Factory functions."""
422
def parse_record(self, version_id, record, record_details,
423
base_content, copy_base_content=True):
424
"""Parse a record into a full content object.
426
:param version_id: The official version id for this content
427
:param record: The data returned by read_records_iter()
428
:param record_details: Details about the record returned by
430
:param base_content: If get_build_details returns a compression_parent,
431
you must return a base_content here, else use None
432
:param copy_base_content: When building from the base_content, decide
433
you can either copy it and return a new object, or modify it in
435
:return: (content, delta) A Content object and possibly a line-delta,
438
method, noeol = record_details
439
if method == 'line-delta':
440
if copy_base_content:
441
content = base_content.copy()
443
content = base_content
444
delta = self.parse_line_delta(record, version_id)
445
content.apply_delta(delta, version_id)
447
content = self.parse_fulltext(record, version_id)
449
content._should_strip_eol = noeol
450
return (content, delta)
453
class KnitAnnotateFactory(_KnitFactory):
454
"""Factory for creating annotated Content objects."""
458
def make(self, lines, version_id):
459
num_lines = len(lines)
460
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
462
def parse_fulltext(self, content, version_id):
463
"""Convert fulltext to internal representation
465
fulltext content is of the format
466
revid(utf8) plaintext\n
467
internal representation is of the format:
470
# TODO: jam 20070209 The tests expect this to be returned as tuples,
471
# but the code itself doesn't really depend on that.
472
# Figure out a way to not require the overhead of turning the
473
# list back into tuples.
474
lines = [tuple(line.split(' ', 1)) for line in content]
475
return AnnotatedKnitContent(lines)
477
def parse_line_delta_iter(self, lines):
478
return iter(self.parse_line_delta(lines))
480
def parse_line_delta(self, lines, version_id, plain=False):
481
"""Convert a line based delta into internal representation.
483
line delta is in the form of:
484
intstart intend intcount
486
revid(utf8) newline\n
487
internal representation is
488
(start, end, count, [1..count tuples (revid, newline)])
490
:param plain: If True, the lines are returned as a plain
491
list without annotations, not as a list of (origin, content) tuples, i.e.
492
(start, end, count, [1..count newline])
499
def cache_and_return(line):
500
origin, text = line.split(' ', 1)
501
return cache.setdefault(origin, origin), text
503
# walk through the lines parsing.
504
# Note that the plain test is explicitly pulled out of the
505
# loop to minimise any performance impact
508
start, end, count = [int(n) for n in header.split(',')]
509
contents = [next().split(' ', 1)[1] for i in xrange(count)]
510
result.append((start, end, count, contents))
513
start, end, count = [int(n) for n in header.split(',')]
514
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
515
result.append((start, end, count, contents))
518
def get_fulltext_content(self, lines):
519
"""Extract just the content lines from a fulltext."""
520
return (line.split(' ', 1)[1] for line in lines)
522
def get_linedelta_content(self, lines):
523
"""Extract just the content from a line delta.
525
This doesn't return all of the extra information stored in a delta.
526
Only the actual content lines.
531
header = header.split(',')
532
count = int(header[2])
533
for i in xrange(count):
534
origin, text = next().split(' ', 1)
537
def lower_fulltext(self, content):
538
"""convert a fulltext content record into a serializable form.
540
see parse_fulltext which this inverts.
542
# TODO: jam 20070209 We only do the caching thing to make sure that
543
# the origin is a valid utf-8 line, eventually we could remove it
544
return ['%s %s' % (o, t) for o, t in content._lines]
546
def lower_line_delta(self, delta):
547
"""convert a delta into a serializable form.
549
See parse_line_delta which this inverts.
551
# TODO: jam 20070209 We only do the caching thing to make sure that
552
# the origin is a valid utf-8 line, eventually we could remove it
554
for start, end, c, lines in delta:
555
out.append('%d,%d,%d\n' % (start, end, c))
556
out.extend(origin + ' ' + text
557
for origin, text in lines)
560
def annotate(self, knit, key):
561
content = knit._get_content(key)
562
# adjust for the fact that serialised annotations are only key suffixes
564
if type(key) == tuple:
566
origins = content.annotate()
568
for origin, line in origins:
569
result.append((prefix + (origin,), line))
572
# XXX: This smells a bit. Why would key ever be a non-tuple here?
573
# Aren't keys defined to be tuples? -- spiv 20080618
574
return content.annotate()
577
class KnitPlainFactory(_KnitFactory):
578
"""Factory for creating plain Content objects."""
582
def make(self, lines, version_id):
583
return PlainKnitContent(lines, version_id)
585
def parse_fulltext(self, content, version_id):
586
"""This parses an unannotated fulltext.
588
Note that this is not a noop - the internal representation
589
has (versionid, line) - its just a constant versionid.
591
return self.make(content, version_id)
593
def parse_line_delta_iter(self, lines, version_id):
595
num_lines = len(lines)
596
while cur < num_lines:
599
start, end, c = [int(n) for n in header.split(',')]
600
yield start, end, c, lines[cur:cur+c]
603
def parse_line_delta(self, lines, version_id):
604
return list(self.parse_line_delta_iter(lines, version_id))
606
def get_fulltext_content(self, lines):
607
"""Extract just the content lines from a fulltext."""
610
def get_linedelta_content(self, lines):
611
"""Extract just the content from a line delta.
613
This doesn't return all of the extra information stored in a delta.
614
Only the actual content lines.
619
header = header.split(',')
620
count = int(header[2])
621
for i in xrange(count):
624
def lower_fulltext(self, content):
625
return content.text()
627
def lower_line_delta(self, delta):
629
for start, end, c, lines in delta:
630
out.append('%d,%d,%d\n' % (start, end, c))
634
def annotate(self, knit, key):
635
annotator = _KnitAnnotator(knit)
636
return annotator.annotate(key)
640
def make_file_factory(annotated, mapper):
641
"""Create a factory for creating a file based KnitVersionedFiles.
643
This is only functional enough to run interface tests, it doesn't try to
644
provide a full pack environment.
646
:param annotated: knit annotations are wanted.
647
:param mapper: The mapper from keys to paths.
649
def factory(transport):
650
index = _KndxIndex(transport, mapper, lambda:None, lambda:True, lambda:True)
651
access = _KnitKeyAccess(transport, mapper)
652
return KnitVersionedFiles(index, access, annotated=annotated)
656
def make_pack_factory(graph, delta, keylength):
657
"""Create a factory for creating a pack based VersionedFiles.
659
This is only functional enough to run interface tests, it doesn't try to
660
provide a full pack environment.
662
:param graph: Store a graph.
663
:param delta: Delta compress contents.
664
:param keylength: How long should keys be.
666
def factory(transport):
667
parents = graph or delta
673
max_delta_chain = 200
676
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
677
key_elements=keylength)
678
stream = transport.open_write_stream('newpack')
679
writer = pack.ContainerWriter(stream.write)
681
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
682
deltas=delta, add_callback=graph_index.add_nodes)
683
access = _DirectPackAccess({})
684
access.set_writer(writer, graph_index, (transport, 'newpack'))
685
result = KnitVersionedFiles(index, access,
686
max_delta_chain=max_delta_chain)
687
result.stream = stream
688
result.writer = writer
693
def cleanup_pack_knit(versioned_files):
694
versioned_files.stream.close()
695
versioned_files.writer.end()
698
class KnitVersionedFiles(VersionedFiles):
699
"""Storage for many versioned files using knit compression.
701
Backend storage is managed by indices and data objects.
703
:ivar _index: A _KnitGraphIndex or similar that can describe the
704
parents, graph, compression and data location of entries in this
705
KnitVersionedFiles. Note that this is only the index for
706
*this* vfs; if there are fallbacks they must be queried separately.
709
def __init__(self, index, data_access, max_delta_chain=200,
711
"""Create a KnitVersionedFiles with index and data_access.
713
:param index: The index for the knit data.
714
:param data_access: The access object to store and retrieve knit
716
:param max_delta_chain: The maximum number of deltas to permit during
717
insertion. Set to 0 to prohibit the use of deltas.
718
:param annotated: Set to True to cause annotations to be calculated and
719
stored during insertion.
722
self._access = data_access
723
self._max_delta_chain = max_delta_chain
725
self._factory = KnitAnnotateFactory()
727
self._factory = KnitPlainFactory()
728
self._fallback_vfs = []
731
return "%s(%r, %r)" % (
732
self.__class__.__name__,
736
def add_fallback_versioned_files(self, a_versioned_files):
737
"""Add a source of texts for texts not present in this knit.
739
:param a_versioned_files: A VersionedFiles object.
741
self._fallback_vfs.append(a_versioned_files)
743
def add_lines(self, key, parents, lines, parent_texts=None,
744
left_matching_blocks=None, nostore_sha=None, random_id=False,
746
"""See VersionedFiles.add_lines()."""
747
self._index._check_write_ok()
748
self._check_add(key, lines, random_id, check_content)
750
# The caller might pass None if there is no graph data, but kndx
751
# indexes can't directly store that, so we give them
752
# an empty tuple instead.
754
return self._add(key, lines, parents,
755
parent_texts, left_matching_blocks, nostore_sha, random_id)
757
def _add(self, key, lines, parents, parent_texts,
758
left_matching_blocks, nostore_sha, random_id):
759
"""Add a set of lines on top of version specified by parents.
761
Any versions not present will be converted into ghosts.
763
# first thing, if the content is something we don't need to store, find
765
line_bytes = ''.join(lines)
766
digest = sha_string(line_bytes)
767
if nostore_sha == digest:
768
raise errors.ExistingContent
771
if parent_texts is None:
773
# Do a single query to ascertain parent presence; we only compress
774
# against parents in the same kvf.
775
present_parent_map = self._index.get_parent_map(parents)
776
for parent in parents:
777
if parent in present_parent_map:
778
present_parents.append(parent)
780
# Currently we can only compress against the left most present parent.
781
if (len(present_parents) == 0 or
782
present_parents[0] != parents[0]):
785
# To speed the extract of texts the delta chain is limited
786
# to a fixed number of deltas. This should minimize both
787
# I/O and the time spend applying deltas.
788
delta = self._check_should_delta(present_parents[0])
790
text_length = len(line_bytes)
793
if lines[-1][-1] != '\n':
794
# copy the contents of lines.
796
options.append('no-eol')
797
lines[-1] = lines[-1] + '\n'
801
if type(element) != str:
802
raise TypeError("key contains non-strings: %r" % (key,))
803
# Knit hunks are still last-element only
805
content = self._factory.make(lines, version_id)
806
if 'no-eol' in options:
807
# Hint to the content object that its text() call should strip the
809
content._should_strip_eol = True
810
if delta or (self._factory.annotated and len(present_parents) > 0):
811
# Merge annotations from parent texts if needed.
812
delta_hunks = self._merge_annotations(content, present_parents,
813
parent_texts, delta, self._factory.annotated,
814
left_matching_blocks)
817
options.append('line-delta')
818
store_lines = self._factory.lower_line_delta(delta_hunks)
819
size, bytes = self._record_to_data(key, digest,
822
options.append('fulltext')
823
# isinstance is slower and we have no hierarchy.
824
if self._factory.__class__ == KnitPlainFactory:
825
# Use the already joined bytes saving iteration time in
827
size, bytes = self._record_to_data(key, digest,
830
# get mixed annotation + content and feed it into the
832
store_lines = self._factory.lower_fulltext(content)
833
size, bytes = self._record_to_data(key, digest,
836
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
837
self._index.add_records(
838
((key, options, access_memo, parents),),
840
return digest, text_length, content
842
def annotate(self, key):
843
"""See VersionedFiles.annotate."""
844
return self._factory.annotate(self, key)
846
def check(self, progress_bar=None):
847
"""See VersionedFiles.check()."""
848
# This doesn't actually test extraction of everything, but that will
849
# impact 'bzr check' substantially, and needs to be integrated with
850
# care. However, it does check for the obvious problem of a delta with
852
keys = self._index.keys()
853
parent_map = self.get_parent_map(keys)
855
if self._index.get_method(key) != 'fulltext':
856
compression_parent = parent_map[key][0]
857
if compression_parent not in parent_map:
858
raise errors.KnitCorrupt(self,
859
"Missing basis parent %s for %s" % (
860
compression_parent, key))
861
for fallback_vfs in self._fallback_vfs:
864
def _check_add(self, key, lines, random_id, check_content):
865
"""check that version_id and lines are safe to add."""
867
if contains_whitespace(version_id):
868
raise InvalidRevisionId(version_id, self)
869
self.check_not_reserved_id(version_id)
870
# TODO: If random_id==False and the key is already present, we should
871
# probably check that the existing content is identical to what is
872
# being inserted, and otherwise raise an exception. This would make
873
# the bundle code simpler.
875
self._check_lines_not_unicode(lines)
876
self._check_lines_are_lines(lines)
878
def _check_header(self, key, line):
879
rec = self._split_header(line)
880
self._check_header_version(rec, key[-1])
883
def _check_header_version(self, rec, version_id):
884
"""Checks the header version on original format knit records.
886
These have the last component of the key embedded in the record.
888
if rec[1] != version_id:
889
raise KnitCorrupt(self,
890
'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
892
def _check_should_delta(self, parent):
893
"""Iterate back through the parent listing, looking for a fulltext.
895
This is used when we want to decide whether to add a delta or a new
896
fulltext. It searches for _max_delta_chain parents. When it finds a
897
fulltext parent, it sees if the total size of the deltas leading up to
898
it is large enough to indicate that we want a new full text anyway.
900
Return True if we should create a new delta, False if we should use a
905
for count in xrange(self._max_delta_chain):
906
# XXX: Collapse these two queries:
908
# Note that this only looks in the index of this particular
909
# KnitVersionedFiles, not in the fallbacks. This ensures that
910
# we won't store a delta spanning physical repository
912
method = self._index.get_method(parent)
913
except RevisionNotPresent:
914
# Some basis is not locally present: always delta
916
index, pos, size = self._index.get_position(parent)
917
if method == 'fulltext':
921
# We don't explicitly check for presence because this is in an
922
# inner loop, and if it's missing it'll fail anyhow.
923
# TODO: This should be asking for compression parent, not graph
925
parent = self._index.get_parent_map([parent])[parent][0]
927
# We couldn't find a fulltext, so we must create a new one
929
# Simple heuristic - if the total I/O wold be greater as a delta than
930
# the originally installed fulltext, we create a new fulltext.
931
return fulltext_size > delta_size
933
def _build_details_to_components(self, build_details):
934
"""Convert a build_details tuple to a position tuple."""
935
# record_details, access_memo, compression_parent
936
return build_details[3], build_details[0], build_details[1]
938
def _get_components_positions(self, keys, allow_missing=False):
939
"""Produce a map of position data for the components of keys.
941
This data is intended to be used for retrieving the knit records.
943
A dict of key to (record_details, index_memo, next, parents) is
945
method is the way referenced data should be applied.
946
index_memo is the handle to pass to the data access to actually get the
948
next is the build-parent of the version, or None for fulltexts.
949
parents is the version_ids of the parents of this version
951
:param allow_missing: If True do not raise an error on a missing component,
955
pending_components = keys
956
while pending_components:
957
build_details = self._index.get_build_details(pending_components)
958
current_components = set(pending_components)
959
pending_components = set()
960
for key, details in build_details.iteritems():
961
(index_memo, compression_parent, parents,
962
record_details) = details
963
method = record_details[0]
964
if compression_parent is not None:
965
pending_components.add(compression_parent)
966
component_data[key] = self._build_details_to_components(details)
967
missing = current_components.difference(build_details)
968
if missing and not allow_missing:
969
raise errors.RevisionNotPresent(missing.pop(), self)
970
return component_data
972
def _get_content(self, key, parent_texts={}):
973
"""Returns a content object that makes up the specified
975
cached_version = parent_texts.get(key, None)
976
if cached_version is not None:
977
# Ensure the cache dict is valid.
978
if not self.get_parent_map([key]):
979
raise RevisionNotPresent(key, self)
980
return cached_version
981
text_map, contents_map = self._get_content_maps([key])
982
return contents_map[key]
984
def _get_content_maps(self, keys, nonlocal_keys=None):
985
"""Produce maps of text and KnitContents
987
:param keys: The keys to produce content maps for.
988
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
989
which are known to not be in this knit, but rather in one of the
991
:return: (text_map, content_map) where text_map contains the texts for
992
the requested versions and content_map contains the KnitContents.
994
# FUTURE: This function could be improved for the 'extract many' case
995
# by tracking each component and only doing the copy when the number of
996
# children than need to apply delta's to it is > 1 or it is part of the
999
multiple_versions = len(keys) != 1
1000
record_map = self._get_record_map(keys, allow_missing=True)
1005
if nonlocal_keys is None:
1006
nonlocal_keys = set()
1008
nonlocal_keys = frozenset(nonlocal_keys)
1009
missing_keys = set(nonlocal_keys)
1010
for source in self._fallback_vfs:
1011
if not missing_keys:
1013
for record in source.get_record_stream(missing_keys,
1015
if record.storage_kind == 'absent':
1017
missing_keys.remove(record.key)
1018
lines = split_lines(record.get_bytes_as('fulltext'))
1019
text_map[record.key] = lines
1020
content_map[record.key] = PlainKnitContent(lines, record.key)
1021
if record.key in keys:
1022
final_content[record.key] = content_map[record.key]
1024
if key in nonlocal_keys:
1029
while cursor is not None:
1031
record, record_details, digest, next = record_map[cursor]
1033
raise RevisionNotPresent(cursor, self)
1034
components.append((cursor, record, record_details, digest))
1036
if cursor in content_map:
1037
# no need to plan further back
1038
components.append((cursor, None, None, None))
1042
for (component_id, record, record_details,
1043
digest) in reversed(components):
1044
if component_id in content_map:
1045
content = content_map[component_id]
1047
content, delta = self._factory.parse_record(key[-1],
1048
record, record_details, content,
1049
copy_base_content=multiple_versions)
1050
if multiple_versions:
1051
content_map[component_id] = content
1053
final_content[key] = content
1055
# digest here is the digest from the last applied component.
1056
text = content.text()
1057
actual_sha = sha_strings(text)
1058
if actual_sha != digest:
1059
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1060
text_map[key] = text
1061
return text_map, final_content
1063
def get_parent_map(self, keys):
1064
"""Get a map of the graph parents of keys.
1066
:param keys: The keys to look up parents for.
1067
:return: A mapping from keys to parents. Absent keys are absent from
1070
return self._get_parent_map_with_sources(keys)[0]
1072
def _get_parent_map_with_sources(self, keys):
1073
"""Get a map of the parents of keys.
1075
:param keys: The keys to look up parents for.
1076
:return: A tuple. The first element is a mapping from keys to parents.
1077
Absent keys are absent from the mapping. The second element is a
1078
list with the locations each key was found in. The first element
1079
is the in-this-knit parents, the second the first fallback source,
1083
sources = [self._index] + self._fallback_vfs
1086
for source in sources:
1089
new_result = source.get_parent_map(missing)
1090
source_results.append(new_result)
1091
result.update(new_result)
1092
missing.difference_update(set(new_result))
1093
return result, source_results
1095
def _get_record_map(self, keys, allow_missing=False):
1096
"""Produce a dictionary of knit records.
1098
:return: {key:(record, record_details, digest, next)}
1100
data returned from read_records
1102
opaque information to pass to parse_record
1104
SHA1 digest of the full text after all steps are done
1106
build-parent of the version, i.e. the leftmost ancestor.
1107
Will be None if the record is not a delta.
1108
:param keys: The keys to build a map for
1109
:param allow_missing: If some records are missing, rather than
1110
error, just return the data that could be generated.
1112
position_map = self._get_components_positions(keys,
1113
allow_missing=allow_missing)
1114
# key = component_id, r = record_details, i_m = index_memo, n = next
1115
records = [(key, i_m) for key, (r, i_m, n)
1116
in position_map.iteritems()]
1118
for key, record, digest in \
1119
self._read_records_iter(records):
1120
(record_details, index_memo, next) = position_map[key]
1121
record_map[key] = record, record_details, digest, next
1124
def _split_by_prefix(self, keys):
1125
"""For the given keys, split them up based on their prefix.
1127
To keep memory pressure somewhat under control, split the
1128
requests back into per-file-id requests, otherwise "bzr co"
1129
extracts the full tree into memory before writing it to disk.
1130
This should be revisited if _get_content_maps() can ever cross
1133
:param keys: An iterable of key tuples
1134
:return: A dict of {prefix: [key_list]}
1136
split_by_prefix = {}
1139
split_by_prefix.setdefault('', []).append(key)
1141
split_by_prefix.setdefault(key[0], []).append(key)
1142
return split_by_prefix
1144
def get_record_stream(self, keys, ordering, include_delta_closure):
1145
"""Get a stream of records for keys.
1147
:param keys: The keys to include.
1148
:param ordering: Either 'unordered' or 'topological'. A topologically
1149
sorted stream has compression parents strictly before their
1151
:param include_delta_closure: If True then the closure across any
1152
compression parents will be included (in the opaque data).
1153
:return: An iterator of ContentFactory objects, each of which is only
1154
valid until the iterator is advanced.
1156
# keys might be a generator
1160
if not self._index.has_graph:
1161
# Cannot topological order when no graph has been stored.
1162
ordering = 'unordered'
1163
if include_delta_closure:
1164
positions = self._get_components_positions(keys, allow_missing=True)
1166
build_details = self._index.get_build_details(keys)
1168
# (record_details, access_memo, compression_parent_key)
1169
positions = dict((key, self._build_details_to_components(details))
1170
for key, details in build_details.iteritems())
1171
absent_keys = keys.difference(set(positions))
1172
# There may be more absent keys : if we're missing the basis component
1173
# and are trying to include the delta closure.
1174
if include_delta_closure:
1175
needed_from_fallback = set()
1176
# Build up reconstructable_keys dict. key:True in this dict means
1177
# the key can be reconstructed.
1178
reconstructable_keys = {}
1182
chain = [key, positions[key][2]]
1184
needed_from_fallback.add(key)
1187
while chain[-1] is not None:
1188
if chain[-1] in reconstructable_keys:
1189
result = reconstructable_keys[chain[-1]]
1193
chain.append(positions[chain[-1]][2])
1195
# missing basis component
1196
needed_from_fallback.add(chain[-1])
1199
for chain_key in chain[:-1]:
1200
reconstructable_keys[chain_key] = result
1202
needed_from_fallback.add(key)
1203
# Double index lookups here : need a unified api ?
1204
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1205
if ordering == 'topological':
1206
# Global topological sort
1207
present_keys = tsort.topo_sort(global_map)
1208
# Now group by source:
1210
current_source = None
1211
for key in present_keys:
1212
for parent_map in parent_maps:
1213
if key in parent_map:
1214
key_source = parent_map
1216
if current_source is not key_source:
1217
source_keys.append((key_source, []))
1218
current_source = key_source
1219
source_keys[-1][1].append(key)
1221
if ordering != 'unordered':
1222
raise AssertionError('valid values for ordering are:'
1223
' "unordered" or "topological" not: %r'
1225
# Just group by source; remote sources first.
1228
for parent_map in reversed(parent_maps):
1229
source_keys.append((parent_map, []))
1230
for key in parent_map:
1231
present_keys.append(key)
1232
source_keys[-1][1].append(key)
1233
absent_keys = keys - set(global_map)
1234
for key in absent_keys:
1235
yield AbsentContentFactory(key)
1236
# restrict our view to the keys we can answer.
1237
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1238
# XXX: At that point we need to consider the impact of double reads by
1239
# utilising components multiple times.
1240
if include_delta_closure:
1241
# XXX: get_content_maps performs its own index queries; allow state
1243
non_local_keys = needed_from_fallback - absent_keys
1244
prefix_split_keys = self._split_by_prefix(present_keys)
1245
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1246
for prefix, keys in prefix_split_keys.iteritems():
1247
non_local = prefix_split_non_local_keys.get(prefix, [])
1248
non_local = set(non_local)
1249
text_map, _ = self._get_content_maps(keys, non_local)
1251
lines = text_map.pop(key)
1252
text = ''.join(lines)
1253
yield FulltextContentFactory(key, global_map[key], None,
1256
for source, keys in source_keys:
1257
if source is parent_maps[0]:
1258
# this KnitVersionedFiles
1259
records = [(key, positions[key][1]) for key in keys]
1260
for key, raw_data, sha1 in self._read_records_iter_raw(records):
1261
(record_details, index_memo, _) = positions[key]
1262
yield KnitContentFactory(key, global_map[key],
1263
record_details, sha1, raw_data, self._factory.annotated, None)
1265
vf = self._fallback_vfs[parent_maps.index(source) - 1]
1266
for record in vf.get_record_stream(keys, ordering,
1267
include_delta_closure):
1270
def get_sha1s(self, keys):
1271
"""See VersionedFiles.get_sha1s()."""
1273
record_map = self._get_record_map(missing, allow_missing=True)
1275
for key, details in record_map.iteritems():
1276
if key not in missing:
1278
# record entry 2 is the 'digest'.
1279
result[key] = details[2]
1280
missing.difference_update(set(result))
1281
for source in self._fallback_vfs:
1284
new_result = source.get_sha1s(missing)
1285
result.update(new_result)
1286
missing.difference_update(set(new_result))
1289
def insert_record_stream(self, stream):
1290
"""Insert a record stream into this container.
1292
:param stream: A stream of records to insert.
1294
:seealso VersionedFiles.get_record_stream:
1296
def get_adapter(adapter_key):
1298
return adapters[adapter_key]
1300
adapter_factory = adapter_registry.get(adapter_key)
1301
adapter = adapter_factory(self)
1302
adapters[adapter_key] = adapter
1304
if self._factory.annotated:
1305
# self is annotated, we need annotated knits to use directly.
1306
annotated = "annotated-"
1309
# self is not annotated, but we can strip annotations cheaply.
1311
convertibles = set(["knit-annotated-ft-gz"])
1312
if self._max_delta_chain:
1313
convertibles.add("knit-annotated-delta-gz")
1314
# The set of types we can cheaply adapt without needing basis texts.
1315
native_types = set()
1316
if self._max_delta_chain:
1317
native_types.add("knit-%sdelta-gz" % annotated)
1318
native_types.add("knit-%sft-gz" % annotated)
1319
knit_types = native_types.union(convertibles)
1321
# Buffer all index entries that we can't add immediately because their
1322
# basis parent is missing. We don't buffer all because generating
1323
# annotations may require access to some of the new records. However we
1324
# can't generate annotations from new deltas until their basis parent
1325
# is present anyway, so we get away with not needing an index that
1326
# includes the new keys.
1328
# See <http://launchpad.net/bugs/300177> about ordering of compression
1329
# parents in the records - to be conservative, we insist that all
1330
# parents must be present to avoid expanding to a fulltext.
1332
# key = basis_parent, value = index entry to add
1333
buffered_index_entries = {}
1334
for record in stream:
1335
parents = record.parents
1336
# Raise an error when a record is missing.
1337
if record.storage_kind == 'absent':
1338
raise RevisionNotPresent([record.key], self)
1339
elif ((record.storage_kind in knit_types)
1341
or not self._fallback_vfs
1342
or not self._index.missing_keys(parents)
1343
or self.missing_keys(parents))):
1344
# we can insert the knit record literally if either it has no
1345
# compression parent OR we already have its basis in this kvf
1346
# OR the basis is not present even in the fallbacks. In the
1347
# last case it will either turn up later in the stream and all
1348
# will be well, or it won't turn up at all and we'll raise an
1351
# TODO: self.has_key is somewhat redundant with
1352
# self._index.has_key; we really want something that directly
1353
# asks if it's only present in the fallbacks. -- mbp 20081119
1354
if record.storage_kind not in native_types:
1356
adapter_key = (record.storage_kind, "knit-delta-gz")
1357
adapter = get_adapter(adapter_key)
1359
adapter_key = (record.storage_kind, "knit-ft-gz")
1360
adapter = get_adapter(adapter_key)
1361
bytes = adapter.get_bytes(
1362
record, record.get_bytes_as(record.storage_kind))
1364
bytes = record.get_bytes_as(record.storage_kind)
1365
options = [record._build_details[0]]
1366
if record._build_details[1]:
1367
options.append('no-eol')
1368
# Just blat it across.
1369
# Note: This does end up adding data on duplicate keys. As
1370
# modern repositories use atomic insertions this should not
1371
# lead to excessive growth in the event of interrupted fetches.
1372
# 'knit' repositories may suffer excessive growth, but as a
1373
# deprecated format this is tolerable. It can be fixed if
1374
# needed by in the kndx index support raising on a duplicate
1375
# add with identical parents and options.
1376
access_memo = self._access.add_raw_records(
1377
[(record.key, len(bytes))], bytes)[0]
1378
index_entry = (record.key, options, access_memo, parents)
1380
if 'fulltext' not in options:
1381
# Not a fulltext, so we need to make sure the compression
1382
# parent will also be present.
1383
# Note that pack backed knits don't need to buffer here
1384
# because they buffer all writes to the transaction level,
1385
# but we don't expose that difference at the index level. If
1386
# the query here has sufficient cost to show up in
1387
# profiling we should do that.
1389
# They're required to be physically in this
1390
# KnitVersionedFiles, not in a fallback.
1391
compression_parent = parents[0]
1392
if self.missing_keys([compression_parent]):
1393
pending = buffered_index_entries.setdefault(
1394
compression_parent, [])
1395
pending.append(index_entry)
1398
self._index.add_records([index_entry])
1399
elif record.storage_kind == 'fulltext':
1400
self.add_lines(record.key, parents,
1401
split_lines(record.get_bytes_as('fulltext')))
1403
# Not a fulltext, and not suitable for direct insertion as a
1404
# delta, either because it's not the right format, or because
1405
# it depends on a base only present in the fallback kvfs.
1406
adapter_key = record.storage_kind, 'fulltext'
1407
adapter = get_adapter(adapter_key)
1408
lines = split_lines(adapter.get_bytes(
1409
record, record.get_bytes_as(record.storage_kind)))
1411
self.add_lines(record.key, parents, lines)
1412
except errors.RevisionAlreadyPresent:
1414
# Add any records whose basis parent is now available.
1415
added_keys = [record.key]
1417
key = added_keys.pop(0)
1418
if key in buffered_index_entries:
1419
index_entries = buffered_index_entries[key]
1420
self._index.add_records(index_entries)
1422
[index_entry[0] for index_entry in index_entries])
1423
del buffered_index_entries[key]
1424
# If there were any deltas which had a missing basis parent, error.
1425
if buffered_index_entries:
1426
from pprint import pformat
1427
raise errors.BzrCheckError(
1428
"record_stream refers to compression parents not in %r:\n%s"
1429
% (self, pformat(sorted(buffered_index_entries.keys()))))
1431
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1432
"""Iterate over the lines in the versioned files from keys.
1434
This may return lines from other keys. Each item the returned
1435
iterator yields is a tuple of a line and a text version that that line
1436
is present in (not introduced in).
1438
Ordering of results is in whatever order is most suitable for the
1439
underlying storage format.
1441
If a progress bar is supplied, it may be used to indicate progress.
1442
The caller is responsible for cleaning up progress bars (because this
1446
* Lines are normalised by the underlying store: they will all have \\n
1448
* Lines are returned in arbitrary order.
1449
* If a requested key did not change any lines (or didn't have any
1450
lines), it may not be mentioned at all in the result.
1452
:return: An iterator over (line, key).
1455
pb = progress.DummyProgress()
1458
# we don't care about inclusions, the caller cares.
1459
# but we need to setup a list of records to visit.
1460
# we need key, position, length
1462
build_details = self._index.get_build_details(keys)
1463
for key, details in build_details.iteritems():
1465
key_records.append((key, details[0]))
1467
records_iter = enumerate(self._read_records_iter(key_records))
1468
for (key_idx, (key, data, sha_value)) in records_iter:
1469
pb.update('Walking content.', key_idx, total)
1470
compression_parent = build_details[key][1]
1471
if compression_parent is None:
1473
line_iterator = self._factory.get_fulltext_content(data)
1476
line_iterator = self._factory.get_linedelta_content(data)
1477
# XXX: It might be more efficient to yield (key,
1478
# line_iterator) in the future. However for now, this is a simpler
1479
# change to integrate into the rest of the codebase. RBC 20071110
1480
for line in line_iterator:
1482
# If there are still keys we've not yet found, we look in the fallback
1483
# vfs, and hope to find them there. Note that if the keys are found
1484
# but had no changes or no content, the fallback may not return
1486
if keys and not self._fallback_vfs:
1487
# XXX: strictly the second parameter is meant to be the file id
1488
# but it's not easily accessible here.
1489
raise RevisionNotPresent(keys, repr(self))
1490
for source in self._fallback_vfs:
1494
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1495
source_keys.add(key)
1497
keys.difference_update(source_keys)
1498
pb.update('Walking content.', total, total)
1500
def _make_line_delta(self, delta_seq, new_content):
1501
"""Generate a line delta from delta_seq and new_content."""
1503
for op in delta_seq.get_opcodes():
1504
if op[0] == 'equal':
1506
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1509
def _merge_annotations(self, content, parents, parent_texts={},
1510
delta=None, annotated=None,
1511
left_matching_blocks=None):
1512
"""Merge annotations for content and generate deltas.
1514
This is done by comparing the annotations based on changes to the text
1515
and generating a delta on the resulting full texts. If annotations are
1516
not being created then a simple delta is created.
1518
if left_matching_blocks is not None:
1519
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1523
for parent_key in parents:
1524
merge_content = self._get_content(parent_key, parent_texts)
1525
if (parent_key == parents[0] and delta_seq is not None):
1528
seq = patiencediff.PatienceSequenceMatcher(
1529
None, merge_content.text(), content.text())
1530
for i, j, n in seq.get_matching_blocks():
1533
# this copies (origin, text) pairs across to the new
1534
# content for any line that matches the last-checked
1536
content._lines[j:j+n] = merge_content._lines[i:i+n]
1537
# XXX: Robert says the following block is a workaround for a
1538
# now-fixed bug and it can probably be deleted. -- mbp 20080618
1539
if content._lines and content._lines[-1][1][-1] != '\n':
1540
# The copied annotation was from a line without a trailing EOL,
1541
# reinstate one for the content object, to ensure correct
1543
line = content._lines[-1][1] + '\n'
1544
content._lines[-1] = (content._lines[-1][0], line)
1546
if delta_seq is None:
1547
reference_content = self._get_content(parents[0], parent_texts)
1548
new_texts = content.text()
1549
old_texts = reference_content.text()
1550
delta_seq = patiencediff.PatienceSequenceMatcher(
1551
None, old_texts, new_texts)
1552
return self._make_line_delta(delta_seq, content)
1554
def _parse_record(self, version_id, data):
1555
"""Parse an original format knit record.
1557
These have the last element of the key only present in the stored data.
1559
rec, record_contents = self._parse_record_unchecked(data)
1560
self._check_header_version(rec, version_id)
1561
return record_contents, rec[3]
1563
def _parse_record_header(self, key, raw_data):
1564
"""Parse a record header for consistency.
1566
:return: the header and the decompressor stream.
1567
as (stream, header_record)
1569
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(raw_data))
1572
rec = self._check_header(key, df.readline())
1573
except Exception, e:
1574
raise KnitCorrupt(self,
1575
"While reading {%s} got %s(%s)"
1576
% (key, e.__class__.__name__, str(e)))
1579
def _parse_record_unchecked(self, data):
1581
# 4168 calls in 2880 217 internal
1582
# 4168 calls to _parse_record_header in 2121
1583
# 4168 calls to readlines in 330
1584
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1586
record_contents = df.readlines()
1587
except Exception, e:
1588
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1589
(data, e.__class__.__name__, str(e)))
1590
header = record_contents.pop(0)
1591
rec = self._split_header(header)
1592
last_line = record_contents.pop()
1593
if len(record_contents) != int(rec[2]):
1594
raise KnitCorrupt(self,
1595
'incorrect number of lines %s != %s'
1596
' for version {%s} %s'
1597
% (len(record_contents), int(rec[2]),
1598
rec[1], record_contents))
1599
if last_line != 'end %s\n' % rec[1]:
1600
raise KnitCorrupt(self,
1601
'unexpected version end line %r, wanted %r'
1602
% (last_line, rec[1]))
1604
return rec, record_contents
1606
def _read_records_iter(self, records):
1607
"""Read text records from data file and yield result.
1609
The result will be returned in whatever is the fastest to read.
1610
Not by the order requested. Also, multiple requests for the same
1611
record will only yield 1 response.
1612
:param records: A list of (key, access_memo) entries
1613
:return: Yields (key, contents, digest) in the order
1614
read, not the order requested
1619
# XXX: This smells wrong, IO may not be getting ordered right.
1620
needed_records = sorted(set(records), key=operator.itemgetter(1))
1621
if not needed_records:
1624
# The transport optimizes the fetching as well
1625
# (ie, reads continuous ranges.)
1626
raw_data = self._access.get_raw_records(
1627
[index_memo for key, index_memo in needed_records])
1629
for (key, index_memo), data in \
1630
izip(iter(needed_records), raw_data):
1631
content, digest = self._parse_record(key[-1], data)
1632
yield key, content, digest
1634
def _read_records_iter_raw(self, records):
1635
"""Read text records from data file and yield raw data.
1637
This unpacks enough of the text record to validate the id is
1638
as expected but thats all.
1640
Each item the iterator yields is (key, bytes, sha1_of_full_text).
1642
# setup an iterator of the external records:
1643
# uses readv so nice and fast we hope.
1645
# grab the disk data needed.
1646
needed_offsets = [index_memo for key, index_memo
1648
raw_records = self._access.get_raw_records(needed_offsets)
1650
for key, index_memo in records:
1651
data = raw_records.next()
1652
# validate the header (note that we can only use the suffix in
1653
# current knit records).
1654
df, rec = self._parse_record_header(key, data)
1656
yield key, data, rec[3]
1658
def _record_to_data(self, key, digest, lines, dense_lines=None):
1659
"""Convert key, digest, lines into a raw data block.
1661
:param key: The key of the record. Currently keys are always serialised
1662
using just the trailing component.
1663
:param dense_lines: The bytes of lines but in a denser form. For
1664
instance, if lines is a list of 1000 bytestrings each ending in \n,
1665
dense_lines may be a list with one line in it, containing all the
1666
1000's lines and their \n's. Using dense_lines if it is already
1667
known is a win because the string join to create bytes in this
1668
function spends less time resizing the final string.
1669
:return: (len, a StringIO instance with the raw data ready to read.)
1671
# Note: using a string copy here increases memory pressure with e.g.
1672
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
1673
# when doing the initial commit of a mozilla tree. RBC 20070921
1674
bytes = ''.join(chain(
1675
["version %s %d %s\n" % (key[-1],
1678
dense_lines or lines,
1679
["end %s\n" % key[-1]]))
1680
if type(bytes) != str:
1681
raise AssertionError(
1682
'data must be plain bytes was %s' % type(bytes))
1683
if lines and lines[-1][-1] != '\n':
1684
raise ValueError('corrupt lines value %r' % lines)
1685
compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
1686
return len(compressed_bytes), compressed_bytes
1688
def _split_header(self, line):
1691
raise KnitCorrupt(self,
1692
'unexpected number of elements in record header')
1696
"""See VersionedFiles.keys."""
1697
if 'evil' in debug.debug_flags:
1698
trace.mutter_callsite(2, "keys scales with size of history")
1699
sources = [self._index] + self._fallback_vfs
1701
for source in sources:
1702
result.update(source.keys())
1706
class _KndxIndex(object):
1707
"""Manages knit index files
1709
The index is kept in memory and read on startup, to enable
1710
fast lookups of revision information. The cursor of the index
1711
file is always pointing to the end, making it easy to append
1714
_cache is a cache for fast mapping from version id to a Index
1717
_history is a cache for fast mapping from indexes to version ids.
1719
The index data format is dictionary compressed when it comes to
1720
parent references; a index entry may only have parents that with a
1721
lover index number. As a result, the index is topological sorted.
1723
Duplicate entries may be written to the index for a single version id
1724
if this is done then the latter one completely replaces the former:
1725
this allows updates to correct version and parent information.
1726
Note that the two entries may share the delta, and that successive
1727
annotations and references MUST point to the first entry.
1729
The index file on disc contains a header, followed by one line per knit
1730
record. The same revision can be present in an index file more than once.
1731
The first occurrence gets assigned a sequence number starting from 0.
1733
The format of a single line is
1734
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1735
REVISION_ID is a utf8-encoded revision id
1736
FLAGS is a comma separated list of flags about the record. Values include
1737
no-eol, line-delta, fulltext.
1738
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1739
that the the compressed data starts at.
1740
LENGTH is the ascii representation of the length of the data file.
1741
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1743
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1744
revision id already in the knit that is a parent of REVISION_ID.
1745
The ' :' marker is the end of record marker.
1748
when a write is interrupted to the index file, it will result in a line
1749
that does not end in ' :'. If the ' :' is not present at the end of a line,
1750
or at the end of the file, then the record that is missing it will be
1751
ignored by the parser.
1753
When writing new records to the index file, the data is preceded by '\n'
1754
to ensure that records always start on new lines even if the last write was
1755
interrupted. As a result its normal for the last line in the index to be
1756
missing a trailing newline. One can be added with no harmful effects.
1758
:ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
1759
where prefix is e.g. the (fileid,) for .texts instances or () for
1760
constant-mapped things like .revisions, and the old state is
1761
tuple(history_vector, cache_dict). This is used to prevent having an
1762
ABI change with the C extension that reads .kndx files.
1765
HEADER = "# bzr knit index 8\n"
1767
def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
1768
"""Create a _KndxIndex on transport using mapper."""
1769
self._transport = transport
1770
self._mapper = mapper
1771
self._get_scope = get_scope
1772
self._allow_writes = allow_writes
1773
self._is_locked = is_locked
1775
self.has_graph = True
1777
def add_records(self, records, random_id=False):
1778
"""Add multiple records to the index.
1780
:param records: a list of tuples:
1781
(key, options, access_memo, parents).
1782
:param random_id: If True the ids being added were randomly generated
1783
and no check for existence will be performed.
1786
for record in records:
1789
path = self._mapper.map(key) + '.kndx'
1790
path_keys = paths.setdefault(path, (prefix, []))
1791
path_keys[1].append(record)
1792
for path in sorted(paths):
1793
prefix, path_keys = paths[path]
1794
self._load_prefixes([prefix])
1796
orig_history = self._kndx_cache[prefix][1][:]
1797
orig_cache = self._kndx_cache[prefix][0].copy()
1800
for key, options, (_, pos, size), parents in path_keys:
1802
# kndx indices cannot be parentless.
1804
line = "\n%s %s %s %s %s :" % (
1805
key[-1], ','.join(options), pos, size,
1806
self._dictionary_compress(parents))
1807
if type(line) != str:
1808
raise AssertionError(
1809
'data must be utf8 was %s' % type(line))
1811
self._cache_key(key, options, pos, size, parents)
1812
if len(orig_history):
1813
self._transport.append_bytes(path, ''.join(lines))
1815
self._init_index(path, lines)
1817
# If any problems happen, restore the original values and re-raise
1818
self._kndx_cache[prefix] = (orig_cache, orig_history)
1821
def _cache_key(self, key, options, pos, size, parent_keys):
1822
"""Cache a version record in the history array and index cache.
1824
This is inlined into _load_data for performance. KEEP IN SYNC.
1825
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1829
version_id = key[-1]
1830
# last-element only for compatibilty with the C load_data.
1831
parents = tuple(parent[-1] for parent in parent_keys)
1832
for parent in parent_keys:
1833
if parent[:-1] != prefix:
1834
raise ValueError("mismatched prefixes for %r, %r" % (
1836
cache, history = self._kndx_cache[prefix]
1837
# only want the _history index to reference the 1st index entry
1839
if version_id not in cache:
1840
index = len(history)
1841
history.append(version_id)
1843
index = cache[version_id][5]
1844
cache[version_id] = (version_id,
1851
def check_header(self, fp):
1852
line = fp.readline()
1854
# An empty file can actually be treated as though the file doesn't
1856
raise errors.NoSuchFile(self)
1857
if line != self.HEADER:
1858
raise KnitHeaderError(badline=line, filename=self)
1860
def _check_read(self):
1861
if not self._is_locked():
1862
raise errors.ObjectNotLocked(self)
1863
if self._get_scope() != self._scope:
1866
def _check_write_ok(self):
1867
"""Assert if not writes are permitted."""
1868
if not self._is_locked():
1869
raise errors.ObjectNotLocked(self)
1870
if self._get_scope() != self._scope:
1872
if self._mode != 'w':
1873
raise errors.ReadOnlyObjectDirtiedError(self)
1875
def get_build_details(self, keys):
1876
"""Get the method, index_memo and compression parent for keys.
1878
Ghosts are omitted from the result.
1880
:param keys: An iterable of keys.
1881
:return: A dict of key:(index_memo, compression_parent, parents,
1884
opaque structure to pass to read_records to extract the raw
1887
Content that this record is built upon, may be None
1889
Logical parents of this node
1891
extra information about the content which needs to be passed to
1892
Factory.parse_record
1894
prefixes = self._partition_keys(keys)
1895
parent_map = self.get_parent_map(keys)
1898
if key not in parent_map:
1900
method = self.get_method(key)
1901
parents = parent_map[key]
1902
if method == 'fulltext':
1903
compression_parent = None
1905
compression_parent = parents[0]
1906
noeol = 'no-eol' in self.get_options(key)
1907
index_memo = self.get_position(key)
1908
result[key] = (index_memo, compression_parent,
1909
parents, (method, noeol))
1912
def get_method(self, key):
1913
"""Return compression method of specified key."""
1914
options = self.get_options(key)
1915
if 'fulltext' in options:
1917
elif 'line-delta' in options:
1920
raise errors.KnitIndexUnknownMethod(self, options)
1922
def get_options(self, key):
1923
"""Return a list representing options.
1927
prefix, suffix = self._split_key(key)
1928
self._load_prefixes([prefix])
1930
return self._kndx_cache[prefix][0][suffix][1]
1932
raise RevisionNotPresent(key, self)
1934
def get_parent_map(self, keys):
1935
"""Get a map of the parents of keys.
1937
:param keys: The keys to look up parents for.
1938
:return: A mapping from keys to parents. Absent keys are absent from
1941
# Parse what we need to up front, this potentially trades off I/O
1942
# locality (.kndx and .knit in the same block group for the same file
1943
# id) for less checking in inner loops.
1944
prefixes = set(key[:-1] for key in keys)
1945
self._load_prefixes(prefixes)
1950
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
1954
result[key] = tuple(prefix + (suffix,) for
1955
suffix in suffix_parents)
1958
def get_position(self, key):
1959
"""Return details needed to access the version.
1961
:return: a tuple (key, data position, size) to hand to the access
1962
logic to get the record.
1964
prefix, suffix = self._split_key(key)
1965
self._load_prefixes([prefix])
1966
entry = self._kndx_cache[prefix][0][suffix]
1967
return key, entry[2], entry[3]
1969
has_key = _mod_index._has_key_from_parent_map
1971
def _init_index(self, path, extra_lines=[]):
1972
"""Initialize an index."""
1974
sio.write(self.HEADER)
1975
sio.writelines(extra_lines)
1977
self._transport.put_file_non_atomic(path, sio,
1978
create_parent_dir=True)
1979
# self._create_parent_dir)
1980
# mode=self._file_mode,
1981
# dir_mode=self._dir_mode)
1984
"""Get all the keys in the collection.
1986
The keys are not ordered.
1989
# Identify all key prefixes.
1990
# XXX: A bit hacky, needs polish.
1991
if type(self._mapper) == ConstantMapper:
1995
for quoted_relpath in self._transport.iter_files_recursive():
1996
path, ext = os.path.splitext(quoted_relpath)
1998
prefixes = [self._mapper.unmap(path) for path in relpaths]
1999
self._load_prefixes(prefixes)
2000
for prefix in prefixes:
2001
for suffix in self._kndx_cache[prefix][1]:
2002
result.add(prefix + (suffix,))
2005
def _load_prefixes(self, prefixes):
2006
"""Load the indices for prefixes."""
2008
for prefix in prefixes:
2009
if prefix not in self._kndx_cache:
2010
# the load_data interface writes to these variables.
2013
self._filename = prefix
2015
path = self._mapper.map(prefix) + '.kndx'
2016
fp = self._transport.get(path)
2018
# _load_data may raise NoSuchFile if the target knit is
2020
_load_data(self, fp)
2023
self._kndx_cache[prefix] = (self._cache, self._history)
2028
self._kndx_cache[prefix] = ({}, [])
2029
if type(self._mapper) == ConstantMapper:
2030
# preserve behaviour for revisions.kndx etc.
2031
self._init_index(path)
2036
missing_keys = _mod_index._missing_keys_from_parent_map
2038
def _partition_keys(self, keys):
2039
"""Turn keys into a dict of prefix:suffix_list."""
2042
prefix_keys = result.setdefault(key[:-1], [])
2043
prefix_keys.append(key[-1])
2046
def _dictionary_compress(self, keys):
2047
"""Dictionary compress keys.
2049
:param keys: The keys to generate references to.
2050
:return: A string representation of keys. keys which are present are
2051
dictionary compressed, and others are emitted as fulltext with a
2057
prefix = keys[0][:-1]
2058
cache = self._kndx_cache[prefix][0]
2060
if key[:-1] != prefix:
2061
# kndx indices cannot refer across partitioned storage.
2062
raise ValueError("mismatched prefixes for %r" % keys)
2063
if key[-1] in cache:
2064
# -- inlined lookup() --
2065
result_list.append(str(cache[key[-1]][5]))
2066
# -- end lookup () --
2068
result_list.append('.' + key[-1])
2069
return ' '.join(result_list)
2071
def _reset_cache(self):
2072
# Possibly this should be a LRU cache. A dictionary from key_prefix to
2073
# (cache_dict, history_vector) for parsed kndx files.
2074
self._kndx_cache = {}
2075
self._scope = self._get_scope()
2076
allow_writes = self._allow_writes()
2082
def _split_key(self, key):
2083
"""Split key into a prefix and suffix."""
2084
return key[:-1], key[-1]
2087
class _KnitGraphIndex(object):
2088
"""A KnitVersionedFiles index layered on GraphIndex."""
2090
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2092
"""Construct a KnitGraphIndex on a graph_index.
2094
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2095
:param is_locked: A callback to check whether the object should answer
2097
:param deltas: Allow delta-compressed records.
2098
:param parents: If True, record knits parents, if not do not record
2100
:param add_callback: If not None, allow additions to the index and call
2101
this callback with a list of added GraphIndex nodes:
2102
[(node, value, node_refs), ...]
2103
:param is_locked: A callback, returns True if the index is locked and
2106
self._add_callback = add_callback
2107
self._graph_index = graph_index
2108
self._deltas = deltas
2109
self._parents = parents
2110
if deltas and not parents:
2111
# XXX: TODO: Delta tree and parent graph should be conceptually
2113
raise KnitCorrupt(self, "Cannot do delta compression without "
2115
self.has_graph = parents
2116
self._is_locked = is_locked
2119
return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2121
def add_records(self, records, random_id=False):
2122
"""Add multiple records to the index.
2124
This function does not insert data into the Immutable GraphIndex
2125
backing the KnitGraphIndex, instead it prepares data for insertion by
2126
the caller and checks that it is safe to insert then calls
2127
self._add_callback with the prepared GraphIndex nodes.
2129
:param records: a list of tuples:
2130
(key, options, access_memo, parents).
2131
:param random_id: If True the ids being added were randomly generated
2132
and no check for existence will be performed.
2134
if not self._add_callback:
2135
raise errors.ReadOnlyError(self)
2136
# we hope there are no repositories with inconsistent parentage
2140
for (key, options, access_memo, parents) in records:
2142
parents = tuple(parents)
2143
index, pos, size = access_memo
2144
if 'no-eol' in options:
2148
value += "%d %d" % (pos, size)
2149
if not self._deltas:
2150
if 'line-delta' in options:
2151
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2154
if 'line-delta' in options:
2155
node_refs = (parents, (parents[0],))
2157
node_refs = (parents, ())
2159
node_refs = (parents, )
2162
raise KnitCorrupt(self, "attempt to add node with parents "
2163
"in parentless index.")
2165
keys[key] = (value, node_refs)
2168
present_nodes = self._get_entries(keys)
2169
for (index, key, value, node_refs) in present_nodes:
2170
if (value[0] != keys[key][0][0] or
2171
node_refs != keys[key][1]):
2172
raise KnitCorrupt(self, "inconsistent details in add_records"
2173
": %s %s" % ((value, node_refs), keys[key]))
2177
for key, (value, node_refs) in keys.iteritems():
2178
result.append((key, value, node_refs))
2180
for key, (value, node_refs) in keys.iteritems():
2181
result.append((key, value))
2182
self._add_callback(result)
2184
def _check_read(self):
2185
"""raise if reads are not permitted."""
2186
if not self._is_locked():
2187
raise errors.ObjectNotLocked(self)
2189
def _check_write_ok(self):
2190
"""Assert if writes are not permitted."""
2191
if not self._is_locked():
2192
raise errors.ObjectNotLocked(self)
2194
def _compression_parent(self, an_entry):
2195
# return the key that an_entry is compressed against, or None
2196
# Grab the second parent list (as deltas implies parents currently)
2197
compression_parents = an_entry[3][1]
2198
if not compression_parents:
2200
if len(compression_parents) != 1:
2201
raise AssertionError(
2202
"Too many compression parents: %r" % compression_parents)
2203
return compression_parents[0]
2205
def get_build_details(self, keys):
2206
"""Get the method, index_memo and compression parent for version_ids.
2208
Ghosts are omitted from the result.
2210
:param keys: An iterable of keys.
2211
:return: A dict of key:
2212
(index_memo, compression_parent, parents, record_details).
2214
opaque structure to pass to read_records to extract the raw
2217
Content that this record is built upon, may be None
2219
Logical parents of this node
2221
extra information about the content which needs to be passed to
2222
Factory.parse_record
2226
entries = self._get_entries(keys, False)
2227
for entry in entries:
2229
if not self._parents:
2232
parents = entry[3][0]
2233
if not self._deltas:
2234
compression_parent_key = None
2236
compression_parent_key = self._compression_parent(entry)
2237
noeol = (entry[2][0] == 'N')
2238
if compression_parent_key:
2239
method = 'line-delta'
2242
result[key] = (self._node_to_position(entry),
2243
compression_parent_key, parents,
2247
def _get_entries(self, keys, check_present=False):
2248
"""Get the entries for keys.
2250
:param keys: An iterable of index key tuples.
2255
for node in self._graph_index.iter_entries(keys):
2257
found_keys.add(node[1])
2259
# adapt parentless index to the rest of the code.
2260
for node in self._graph_index.iter_entries(keys):
2261
yield node[0], node[1], node[2], ()
2262
found_keys.add(node[1])
2264
missing_keys = keys.difference(found_keys)
2266
raise RevisionNotPresent(missing_keys.pop(), self)
2268
def get_method(self, key):
2269
"""Return compression method of specified key."""
2270
return self._get_method(self._get_node(key))
2272
def _get_method(self, node):
2273
if not self._deltas:
2275
if self._compression_parent(node):
2280
def _get_node(self, key):
2282
return list(self._get_entries([key]))[0]
2284
raise RevisionNotPresent(key, self)
2286
def get_options(self, key):
2287
"""Return a list representing options.
2291
node = self._get_node(key)
2292
options = [self._get_method(node)]
2293
if node[2][0] == 'N':
2294
options.append('no-eol')
2297
def get_parent_map(self, keys):
2298
"""Get a map of the parents of keys.
2300
:param keys: The keys to look up parents for.
2301
:return: A mapping from keys to parents. Absent keys are absent from
2305
nodes = self._get_entries(keys)
2309
result[node[1]] = node[3][0]
2312
result[node[1]] = None
2315
def get_position(self, key):
2316
"""Return details needed to access the version.
2318
:return: a tuple (index, data position, size) to hand to the access
2319
logic to get the record.
2321
node = self._get_node(key)
2322
return self._node_to_position(node)
2324
has_key = _mod_index._has_key_from_parent_map
2327
"""Get all the keys in the collection.
2329
The keys are not ordered.
2332
return [node[1] for node in self._graph_index.iter_all_entries()]
2334
missing_keys = _mod_index._missing_keys_from_parent_map
2336
def _node_to_position(self, node):
2337
"""Convert an index value to position details."""
2338
bits = node[2][1:].split(' ')
2339
return node[0], int(bits[0]), int(bits[1])
2342
class _KnitKeyAccess(object):
2343
"""Access to records in .knit files."""
2345
def __init__(self, transport, mapper):
2346
"""Create a _KnitKeyAccess with transport and mapper.
2348
:param transport: The transport the access object is rooted at.
2349
:param mapper: The mapper used to map keys to .knit files.
2351
self._transport = transport
2352
self._mapper = mapper
2354
def add_raw_records(self, key_sizes, raw_data):
2355
"""Add raw knit bytes to a storage area.
2357
The data is spooled to the container writer in one bytes-record per
2360
:param sizes: An iterable of tuples containing the key and size of each
2362
:param raw_data: A bytestring containing the data.
2363
:return: A list of memos to retrieve the record later. Each memo is an
2364
opaque index memo. For _KnitKeyAccess the memo is (key, pos,
2365
length), where the key is the record key.
2367
if type(raw_data) != str:
2368
raise AssertionError(
2369
'data must be plain bytes was %s' % type(raw_data))
2372
# TODO: This can be tuned for writing to sftp and other servers where
2373
# append() is relatively expensive by grouping the writes to each key
2375
for key, size in key_sizes:
2376
path = self._mapper.map(key)
2378
base = self._transport.append_bytes(path + '.knit',
2379
raw_data[offset:offset+size])
2380
except errors.NoSuchFile:
2381
self._transport.mkdir(osutils.dirname(path))
2382
base = self._transport.append_bytes(path + '.knit',
2383
raw_data[offset:offset+size])
2387
result.append((key, base, size))
2390
def get_raw_records(self, memos_for_retrieval):
2391
"""Get the raw bytes for a records.
2393
:param memos_for_retrieval: An iterable containing the access memo for
2394
retrieving the bytes.
2395
:return: An iterator over the bytes of the records.
2397
# first pass, group into same-index request to minimise readv's issued.
2399
current_prefix = None
2400
for (key, offset, length) in memos_for_retrieval:
2401
if current_prefix == key[:-1]:
2402
current_list.append((offset, length))
2404
if current_prefix is not None:
2405
request_lists.append((current_prefix, current_list))
2406
current_prefix = key[:-1]
2407
current_list = [(offset, length)]
2408
# handle the last entry
2409
if current_prefix is not None:
2410
request_lists.append((current_prefix, current_list))
2411
for prefix, read_vector in request_lists:
2412
path = self._mapper.map(prefix) + '.knit'
2413
for pos, data in self._transport.readv(path, read_vector):
2417
class _DirectPackAccess(object):
2418
"""Access to data in one or more packs with less translation."""
2420
def __init__(self, index_to_packs):
2421
"""Create a _DirectPackAccess object.
2423
:param index_to_packs: A dict mapping index objects to the transport
2424
and file names for obtaining data.
2426
self._container_writer = None
2427
self._write_index = None
2428
self._indices = index_to_packs
2430
def add_raw_records(self, key_sizes, raw_data):
2431
"""Add raw knit bytes to a storage area.
2433
The data is spooled to the container writer in one bytes-record per
2436
:param sizes: An iterable of tuples containing the key and size of each
2438
:param raw_data: A bytestring containing the data.
2439
:return: A list of memos to retrieve the record later. Each memo is an
2440
opaque index memo. For _DirectPackAccess the memo is (index, pos,
2441
length), where the index field is the write_index object supplied
2442
to the PackAccess object.
2444
if type(raw_data) != str:
2445
raise AssertionError(
2446
'data must be plain bytes was %s' % type(raw_data))
2449
for key, size in key_sizes:
2450
p_offset, p_length = self._container_writer.add_bytes_record(
2451
raw_data[offset:offset+size], [])
2453
result.append((self._write_index, p_offset, p_length))
2456
def get_raw_records(self, memos_for_retrieval):
2457
"""Get the raw bytes for a records.
2459
:param memos_for_retrieval: An iterable containing the (index, pos,
2460
length) memo for retrieving the bytes. The Pack access method
2461
looks up the pack to use for a given record in its index_to_pack
2463
:return: An iterator over the bytes of the records.
2465
# first pass, group into same-index requests
2467
current_index = None
2468
for (index, offset, length) in memos_for_retrieval:
2469
if current_index == index:
2470
current_list.append((offset, length))
2472
if current_index is not None:
2473
request_lists.append((current_index, current_list))
2474
current_index = index
2475
current_list = [(offset, length)]
2476
# handle the last entry
2477
if current_index is not None:
2478
request_lists.append((current_index, current_list))
2479
for index, offsets in request_lists:
2480
transport, path = self._indices[index]
2481
reader = pack.make_readv_reader(transport, path, offsets)
2482
for names, read_func in reader.iter_records():
2483
yield read_func(None)
2485
def set_writer(self, writer, index, transport_packname):
2486
"""Set a writer to use for adding data."""
2487
if index is not None:
2488
self._indices[index] = transport_packname
2489
self._container_writer = writer
2490
self._write_index = index
2493
# Deprecated, use PatienceSequenceMatcher instead
2494
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2497
def annotate_knit(knit, revision_id):
2498
"""Annotate a knit with no cached annotations.
2500
This implementation is for knits with no cached annotations.
2501
It will work for knits with cached annotations, but this is not
2504
annotator = _KnitAnnotator(knit)
2505
return iter(annotator.annotate(revision_id))
2508
class _KnitAnnotator(object):
2509
"""Build up the annotations for a text."""
2511
def __init__(self, knit):
2514
# Content objects, differs from fulltexts because of how final newlines
2515
# are treated by knits. the content objects here will always have a
2517
self._fulltext_contents = {}
2519
# Annotated lines of specific revisions
2520
self._annotated_lines = {}
2522
# Track the raw data for nodes that we could not process yet.
2523
# This maps the revision_id of the base to a list of children that will
2524
# annotated from it.
2525
self._pending_children = {}
2527
# Nodes which cannot be extracted
2528
self._ghosts = set()
2530
# Track how many children this node has, so we know if we need to keep
2532
self._annotate_children = {}
2533
self._compression_children = {}
2535
self._all_build_details = {}
2536
# The children => parent revision_id graph
2537
self._revision_id_graph = {}
2539
self._heads_provider = None
2541
self._nodes_to_keep_annotations = set()
2542
self._generations_until_keep = 100
2544
def set_generations_until_keep(self, value):
2545
"""Set the number of generations before caching a node.
2547
Setting this to -1 will cache every merge node, setting this higher
2548
will cache fewer nodes.
2550
self._generations_until_keep = value
2552
def _add_fulltext_content(self, revision_id, content_obj):
2553
self._fulltext_contents[revision_id] = content_obj
2554
# TODO: jam 20080305 It might be good to check the sha1digest here
2555
return content_obj.text()
2557
def _check_parents(self, child, nodes_to_annotate):
2558
"""Check if all parents have been processed.
2560
:param child: A tuple of (rev_id, parents, raw_content)
2561
:param nodes_to_annotate: If child is ready, add it to
2562
nodes_to_annotate, otherwise put it back in self._pending_children
2564
for parent_id in child[1]:
2565
if (parent_id not in self._annotated_lines):
2566
# This parent is present, but another parent is missing
2567
self._pending_children.setdefault(parent_id,
2571
# This one is ready to be processed
2572
nodes_to_annotate.append(child)
2574
def _add_annotation(self, revision_id, fulltext, parent_ids,
2575
left_matching_blocks=None):
2576
"""Add an annotation entry.
2578
All parents should already have been annotated.
2579
:return: A list of children that now have their parents satisfied.
2581
a = self._annotated_lines
2582
annotated_parent_lines = [a[p] for p in parent_ids]
2583
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2584
fulltext, revision_id, left_matching_blocks,
2585
heads_provider=self._get_heads_provider()))
2586
self._annotated_lines[revision_id] = annotated_lines
2587
for p in parent_ids:
2588
ann_children = self._annotate_children[p]
2589
ann_children.remove(revision_id)
2590
if (not ann_children
2591
and p not in self._nodes_to_keep_annotations):
2592
del self._annotated_lines[p]
2593
del self._all_build_details[p]
2594
if p in self._fulltext_contents:
2595
del self._fulltext_contents[p]
2596
# Now that we've added this one, see if there are any pending
2597
# deltas to be done, certainly this parent is finished
2598
nodes_to_annotate = []
2599
for child in self._pending_children.pop(revision_id, []):
2600
self._check_parents(child, nodes_to_annotate)
2601
return nodes_to_annotate
2603
def _get_build_graph(self, key):
2604
"""Get the graphs for building texts and annotations.
2606
The data you need for creating a full text may be different than the
2607
data you need to annotate that text. (At a minimum, you need both
2608
parents to create an annotation, but only need 1 parent to generate the
2611
:return: A list of (key, index_memo) records, suitable for
2612
passing to read_records_iter to start reading in the raw data fro/
2615
if key in self._annotated_lines:
2618
pending = set([key])
2623
# get all pending nodes
2625
this_iteration = pending
2626
build_details = self._knit._index.get_build_details(this_iteration)
2627
self._all_build_details.update(build_details)
2628
# new_nodes = self._knit._index._get_entries(this_iteration)
2630
for key, details in build_details.iteritems():
2631
(index_memo, compression_parent, parents,
2632
record_details) = details
2633
self._revision_id_graph[key] = parents
2634
records.append((key, index_memo))
2635
# Do we actually need to check _annotated_lines?
2636
pending.update(p for p in parents
2637
if p not in self._all_build_details)
2638
if compression_parent:
2639
self._compression_children.setdefault(compression_parent,
2642
for parent in parents:
2643
self._annotate_children.setdefault(parent,
2645
num_gens = generation - kept_generation
2646
if ((num_gens >= self._generations_until_keep)
2647
and len(parents) > 1):
2648
kept_generation = generation
2649
self._nodes_to_keep_annotations.add(key)
2651
missing_versions = this_iteration.difference(build_details.keys())
2652
self._ghosts.update(missing_versions)
2653
for missing_version in missing_versions:
2654
# add a key, no parents
2655
self._revision_id_graph[missing_version] = ()
2656
pending.discard(missing_version) # don't look for it
2657
if self._ghosts.intersection(self._compression_children):
2659
"We cannot have nodes which have a ghost compression parent:\n"
2661
"compression children: %r"
2662
% (self._ghosts, self._compression_children))
2663
# Cleanout anything that depends on a ghost so that we don't wait for
2664
# the ghost to show up
2665
for node in self._ghosts:
2666
if node in self._annotate_children:
2667
# We won't be building this node
2668
del self._annotate_children[node]
2669
# Generally we will want to read the records in reverse order, because
2670
# we find the parent nodes after the children
2674
def _annotate_records(self, records):
2675
"""Build the annotations for the listed records."""
2676
# We iterate in the order read, rather than a strict order requested
2677
# However, process what we can, and put off to the side things that
2678
# still need parents, cleaning them up when those parents are
2680
for (rev_id, record,
2681
digest) in self._knit._read_records_iter(records):
2682
if rev_id in self._annotated_lines:
2684
parent_ids = self._revision_id_graph[rev_id]
2685
parent_ids = [p for p in parent_ids if p not in self._ghosts]
2686
details = self._all_build_details[rev_id]
2687
(index_memo, compression_parent, parents,
2688
record_details) = details
2689
nodes_to_annotate = []
2690
# TODO: Remove the punning between compression parents, and
2691
# parent_ids, we should be able to do this without assuming
2693
if len(parent_ids) == 0:
2694
# There are no parents for this node, so just add it
2695
# TODO: This probably needs to be decoupled
2696
fulltext_content, delta = self._knit._factory.parse_record(
2697
rev_id, record, record_details, None)
2698
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2699
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2700
parent_ids, left_matching_blocks=None))
2702
child = (rev_id, parent_ids, record)
2703
# Check if all the parents are present
2704
self._check_parents(child, nodes_to_annotate)
2705
while nodes_to_annotate:
2706
# Should we use a queue here instead of a stack?
2707
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
2708
(index_memo, compression_parent, parents,
2709
record_details) = self._all_build_details[rev_id]
2711
if compression_parent is not None:
2712
comp_children = self._compression_children[compression_parent]
2713
if rev_id not in comp_children:
2714
raise AssertionError("%r not in compression children %r"
2715
% (rev_id, comp_children))
2716
# If there is only 1 child, it is safe to reuse this
2718
reuse_content = (len(comp_children) == 1
2719
and compression_parent not in
2720
self._nodes_to_keep_annotations)
2722
# Remove it from the cache since it will be changing
2723
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2724
# Make sure to copy the fulltext since it might be
2726
parent_fulltext = list(parent_fulltext_content.text())
2728
parent_fulltext_content = self._fulltext_contents[compression_parent]
2729
parent_fulltext = parent_fulltext_content.text()
2730
comp_children.remove(rev_id)
2731
fulltext_content, delta = self._knit._factory.parse_record(
2732
rev_id, record, record_details,
2733
parent_fulltext_content,
2734
copy_base_content=(not reuse_content))
2735
fulltext = self._add_fulltext_content(rev_id,
2737
if compression_parent == parent_ids[0]:
2738
# the compression_parent is the left parent, so we can
2740
blocks = KnitContent.get_line_delta_blocks(delta,
2741
parent_fulltext, fulltext)
2743
fulltext_content = self._knit._factory.parse_fulltext(
2745
fulltext = self._add_fulltext_content(rev_id,
2747
nodes_to_annotate.extend(
2748
self._add_annotation(rev_id, fulltext, parent_ids,
2749
left_matching_blocks=blocks))
2751
def _get_heads_provider(self):
2752
"""Create a heads provider for resolving ancestry issues."""
2753
if self._heads_provider is not None:
2754
return self._heads_provider
2755
parent_provider = _mod_graph.DictParentsProvider(
2756
self._revision_id_graph)
2757
graph_obj = _mod_graph.Graph(parent_provider)
2758
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2759
self._heads_provider = head_cache
2762
def annotate(self, key):
2763
"""Return the annotated fulltext at the given key.
2765
:param key: The key to annotate.
2767
if len(self._knit._fallback_vfs) > 0:
2768
# stacked knits can't use the fast path at present.
2769
return self._simple_annotate(key)
2770
records = self._get_build_graph(key)
2771
if key in self._ghosts:
2772
raise errors.RevisionNotPresent(key, self._knit)
2773
self._annotate_records(records)
2774
return self._annotated_lines[key]
2776
def _simple_annotate(self, key):
2777
"""Return annotated fulltext, rediffing from the full texts.
2779
This is slow but makes no assumptions about the repository
2780
being able to produce line deltas.
2782
# TODO: this code generates a parent maps of present ancestors; it
2783
# could be split out into a separate method, and probably should use
2784
# iter_ancestry instead. -- mbp and robertc 20080704
2785
graph = _mod_graph.Graph(self._knit)
2786
head_cache = _mod_graph.FrozenHeadsCache(graph)
2787
search = graph._make_breadth_first_searcher([key])
2791
present, ghosts = search.next_with_ghosts()
2792
except StopIteration:
2794
keys.update(present)
2795
parent_map = self._knit.get_parent_map(keys)
2797
reannotate = annotate.reannotate
2798
for record in self._knit.get_record_stream(keys, 'topological', True):
2800
fulltext = split_lines(record.get_bytes_as('fulltext'))
2801
parents = parent_map[key]
2802
if parents is not None:
2803
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2806
parent_cache[key] = list(
2807
reannotate(parent_lines, fulltext, key, None, head_cache))
2809
return parent_cache[key]
2811
raise errors.RevisionNotPresent(key, self._knit)
2815
from bzrlib._knit_load_data_c import _load_data_c as _load_data
2817
from bzrlib._knit_load_data_py import _load_data_py as _load_data