1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
64
from cStringIO import StringIO
65
from itertools import izip, chain
70
from zlib import Z_DEFAULT_COMPRESSION
73
from bzrlib.lazy_import import lazy_import
74
lazy_import(globals(), """
94
from bzrlib.errors import (
102
RevisionAlreadyPresent,
104
from bzrlib.graph import Graph
105
from bzrlib.osutils import (
112
from bzrlib.symbol_versioning import (
113
DEPRECATED_PARAMETER,
118
from bzrlib.tsort import topo_sort
119
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
121
from bzrlib.versionedfile import (
122
AbsentContentFactory,
131
# TODO: Split out code specific to this format into an associated object.
133
# TODO: Can we put in some kind of value to check that the index and data
134
# files belong together?
136
# TODO: accommodate binaries, perhaps by storing a byte count
138
# TODO: function to check whole file
140
# TODO: atomically append data, then measure backwards from the cursor
141
# position after writing to work out where it was located. we may need to
142
# bypass python file buffering.
144
DATA_SUFFIX = '.knit'
145
INDEX_SUFFIX = '.kndx'
148
class KnitAdapter(object):
149
"""Base class for knit record adaption."""
151
def __init__(self, basis_vf):
152
"""Create an adapter which accesses full texts from basis_vf.
154
:param basis_vf: A versioned file to access basis texts of deltas from.
155
May be None for adapters that do not need to access basis texts.
157
self._data = _KnitData(None)
158
self._annotate_factory = KnitAnnotateFactory()
159
self._plain_factory = KnitPlainFactory()
160
self._basis_vf = basis_vf
163
class FTAnnotatedToUnannotated(KnitAdapter):
164
"""An adapter from FT annotated knits to unannotated ones."""
166
def get_bytes(self, factory, annotated_compressed_bytes):
168
self._data._parse_record_unchecked(annotated_compressed_bytes)
169
content = self._annotate_factory.parse_fulltext(contents, rec[1])
170
size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
174
class DeltaAnnotatedToUnannotated(KnitAdapter):
175
"""An adapter for deltas from annotated to unannotated."""
177
def get_bytes(self, factory, annotated_compressed_bytes):
179
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
182
contents = self._plain_factory.lower_line_delta(delta)
183
size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
187
class FTAnnotatedToFullText(KnitAdapter):
188
"""An adapter from FT annotated knits to unannotated ones."""
190
def get_bytes(self, factory, annotated_compressed_bytes):
192
self._data._parse_record_unchecked(annotated_compressed_bytes)
193
content, delta = self._annotate_factory.parse_record(factory.key[0],
194
contents, factory._build_details, None)
195
return ''.join(content.text())
198
class DeltaAnnotatedToFullText(KnitAdapter):
199
"""An adapter for deltas from annotated to unannotated."""
201
def get_bytes(self, factory, annotated_compressed_bytes):
203
self._data._parse_record_unchecked(annotated_compressed_bytes)
204
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
206
compression_parent = factory.parents[0][0]
207
basis_lines = self._basis_vf.get_lines(compression_parent)
208
# Manually apply the delta because we have one annotated content and
210
basis_content = PlainKnitContent(basis_lines, compression_parent)
211
basis_content.apply_delta(delta, rec[1])
212
basis_content._should_strip_eol = factory._build_details[1]
213
return ''.join(basis_content.text())
216
class FTPlainToFullText(KnitAdapter):
217
"""An adapter from FT plain knits to unannotated ones."""
219
def get_bytes(self, factory, compressed_bytes):
221
self._data._parse_record_unchecked(compressed_bytes)
222
content, delta = self._plain_factory.parse_record(factory.key[0],
223
contents, factory._build_details, None)
224
return ''.join(content.text())
227
class DeltaPlainToFullText(KnitAdapter):
228
"""An adapter for deltas from annotated to unannotated."""
230
def get_bytes(self, factory, compressed_bytes):
232
self._data._parse_record_unchecked(compressed_bytes)
233
delta = self._plain_factory.parse_line_delta(contents, rec[1])
234
compression_parent = factory.parents[0][0]
235
basis_lines = self._basis_vf.get_lines(compression_parent)
236
basis_content = PlainKnitContent(basis_lines, compression_parent)
237
# Manually apply the delta because we have one annotated content and
239
content, _ = self._plain_factory.parse_record(rec[1], contents,
240
factory._build_details, basis_content)
241
return ''.join(content.text())
244
class KnitContentFactory(ContentFactory):
245
"""Content factory for streaming from knits.
247
:seealso ContentFactory:
250
def __init__(self, version, parents, build_details, sha1, raw_record,
251
annotated, knit=None):
252
"""Create a KnitContentFactory for version.
254
:param version: The version.
255
:param parents: The parents.
256
:param build_details: The build details as returned from
258
:param sha1: The sha1 expected from the full text of this object.
259
:param raw_record: The bytes of the knit data from disk.
260
:param annotated: True if the raw data is annotated.
262
ContentFactory.__init__(self)
264
self.key = (version,)
265
self.parents = tuple((parent,) for parent in parents)
266
if build_details[0] == 'line-delta':
271
annotated_kind = 'annotated-'
274
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
275
self._raw_record = raw_record
276
self._build_details = build_details
279
def get_bytes_as(self, storage_kind):
280
if storage_kind == self.storage_kind:
281
return self._raw_record
282
if storage_kind == 'fulltext' and self._knit is not None:
283
return self._knit.get_text(self.key[0])
285
raise errors.UnavailableRepresentation(self.key, storage_kind,
289
class KnitContent(object):
290
"""Content of a knit version to which deltas can be applied."""
293
self._should_strip_eol = False
295
def apply_delta(self, delta, new_version_id):
296
"""Apply delta to this object to become new_version_id."""
297
raise NotImplementedError(self.apply_delta)
299
def cleanup_eol(self, copy_on_mutate=True):
300
if self._should_strip_eol:
302
self._lines = self._lines[:]
303
self.strip_last_line_newline()
305
def line_delta_iter(self, new_lines):
306
"""Generate line-based delta from this content to new_lines."""
307
new_texts = new_lines.text()
308
old_texts = self.text()
309
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
310
for tag, i1, i2, j1, j2 in s.get_opcodes():
313
# ofrom, oto, length, data
314
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
316
def line_delta(self, new_lines):
317
return list(self.line_delta_iter(new_lines))
320
def get_line_delta_blocks(knit_delta, source, target):
321
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
322
target_len = len(target)
325
for s_begin, s_end, t_len, new_text in knit_delta:
326
true_n = s_begin - s_pos
329
# knit deltas do not provide reliable info about whether the
330
# last line of a file matches, due to eol handling.
331
if source[s_pos + n -1] != target[t_pos + n -1]:
334
yield s_pos, t_pos, n
335
t_pos += t_len + true_n
337
n = target_len - t_pos
339
if source[s_pos + n -1] != target[t_pos + n -1]:
342
yield s_pos, t_pos, n
343
yield s_pos + (target_len - t_pos), target_len, 0
346
class AnnotatedKnitContent(KnitContent):
347
"""Annotated content."""
349
def __init__(self, lines):
350
KnitContent.__init__(self)
354
"""Return a list of (origin, text) for each content line."""
355
return list(self._lines)
357
def apply_delta(self, delta, new_version_id):
358
"""Apply delta to this object to become new_version_id."""
361
for start, end, count, delta_lines in delta:
362
lines[offset+start:offset+end] = delta_lines
363
offset = offset + (start - end) + count
365
def strip_last_line_newline(self):
366
line = self._lines[-1][1].rstrip('\n')
367
self._lines[-1] = (self._lines[-1][0], line)
368
self._should_strip_eol = False
372
lines = [text for origin, text in self._lines]
373
except ValueError, e:
374
# most commonly (only?) caused by the internal form of the knit
375
# missing annotation information because of a bug - see thread
377
raise KnitCorrupt(self,
378
"line in annotated knit missing annotation information: %s"
381
if self._should_strip_eol:
382
lines[-1] = lines[-1].rstrip('\n')
386
return AnnotatedKnitContent(self._lines[:])
389
class PlainKnitContent(KnitContent):
390
"""Unannotated content.
392
When annotate[_iter] is called on this content, the same version is reported
393
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
397
def __init__(self, lines, version_id):
398
KnitContent.__init__(self)
400
self._version_id = version_id
403
"""Return a list of (origin, text) for each content line."""
404
return [(self._version_id, line) for line in self._lines]
406
def apply_delta(self, delta, new_version_id):
407
"""Apply delta to this object to become new_version_id."""
410
for start, end, count, delta_lines in delta:
411
lines[offset+start:offset+end] = delta_lines
412
offset = offset + (start - end) + count
413
self._version_id = new_version_id
416
return PlainKnitContent(self._lines[:], self._version_id)
418
def strip_last_line_newline(self):
419
self._lines[-1] = self._lines[-1].rstrip('\n')
420
self._should_strip_eol = False
424
if self._should_strip_eol:
426
lines[-1] = lines[-1].rstrip('\n')
430
class _KnitFactory(object):
431
"""Base class for common Factory functions."""
433
def parse_record(self, version_id, record, record_details,
434
base_content, copy_base_content=True):
435
"""Parse a record into a full content object.
437
:param version_id: The official version id for this content
438
:param record: The data returned by read_records_iter()
439
:param record_details: Details about the record returned by
441
:param base_content: If get_build_details returns a compression_parent,
442
you must return a base_content here, else use None
443
:param copy_base_content: When building from the base_content, decide
444
you can either copy it and return a new object, or modify it in
446
:return: (content, delta) A Content object and possibly a line-delta,
449
method, noeol = record_details
450
if method == 'line-delta':
451
assert base_content is not None
452
if copy_base_content:
453
content = base_content.copy()
455
content = base_content
456
delta = self.parse_line_delta(record, version_id)
457
content.apply_delta(delta, version_id)
459
content = self.parse_fulltext(record, version_id)
461
content._should_strip_eol = noeol
462
return (content, delta)
465
class KnitAnnotateFactory(_KnitFactory):
466
"""Factory for creating annotated Content objects."""
470
def make(self, lines, version_id):
471
num_lines = len(lines)
472
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
474
def parse_fulltext(self, content, version_id):
475
"""Convert fulltext to internal representation
477
fulltext content is of the format
478
revid(utf8) plaintext\n
479
internal representation is of the format:
482
# TODO: jam 20070209 The tests expect this to be returned as tuples,
483
# but the code itself doesn't really depend on that.
484
# Figure out a way to not require the overhead of turning the
485
# list back into tuples.
486
lines = [tuple(line.split(' ', 1)) for line in content]
487
return AnnotatedKnitContent(lines)
489
def parse_line_delta_iter(self, lines):
490
return iter(self.parse_line_delta(lines))
492
def parse_line_delta(self, lines, version_id, plain=False):
493
"""Convert a line based delta into internal representation.
495
line delta is in the form of:
496
intstart intend intcount
498
revid(utf8) newline\n
499
internal representation is
500
(start, end, count, [1..count tuples (revid, newline)])
502
:param plain: If True, the lines are returned as a plain
503
list without annotations, not as a list of (origin, content) tuples, i.e.
504
(start, end, count, [1..count newline])
511
def cache_and_return(line):
512
origin, text = line.split(' ', 1)
513
return cache.setdefault(origin, origin), text
515
# walk through the lines parsing.
516
# Note that the plain test is explicitly pulled out of the
517
# loop to minimise any performance impact
520
start, end, count = [int(n) for n in header.split(',')]
521
contents = [next().split(' ', 1)[1] for i in xrange(count)]
522
result.append((start, end, count, contents))
525
start, end, count = [int(n) for n in header.split(',')]
526
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
527
result.append((start, end, count, contents))
530
def get_fulltext_content(self, lines):
531
"""Extract just the content lines from a fulltext."""
532
return (line.split(' ', 1)[1] for line in lines)
534
def get_linedelta_content(self, lines):
535
"""Extract just the content from a line delta.
537
This doesn't return all of the extra information stored in a delta.
538
Only the actual content lines.
543
header = header.split(',')
544
count = int(header[2])
545
for i in xrange(count):
546
origin, text = next().split(' ', 1)
549
def lower_fulltext(self, content):
550
"""convert a fulltext content record into a serializable form.
552
see parse_fulltext which this inverts.
554
# TODO: jam 20070209 We only do the caching thing to make sure that
555
# the origin is a valid utf-8 line, eventually we could remove it
556
return ['%s %s' % (o, t) for o, t in content._lines]
558
def lower_line_delta(self, delta):
559
"""convert a delta into a serializable form.
561
See parse_line_delta which this inverts.
563
# TODO: jam 20070209 We only do the caching thing to make sure that
564
# the origin is a valid utf-8 line, eventually we could remove it
566
for start, end, c, lines in delta:
567
out.append('%d,%d,%d\n' % (start, end, c))
568
out.extend(origin + ' ' + text
569
for origin, text in lines)
572
def annotate(self, knit, version_id):
573
content = knit._get_content(version_id)
574
return content.annotate()
577
class KnitPlainFactory(_KnitFactory):
578
"""Factory for creating plain Content objects."""
582
def make(self, lines, version_id):
583
return PlainKnitContent(lines, version_id)
585
def parse_fulltext(self, content, version_id):
586
"""This parses an unannotated fulltext.
588
Note that this is not a noop - the internal representation
589
has (versionid, line) - its just a constant versionid.
591
return self.make(content, version_id)
593
def parse_line_delta_iter(self, lines, version_id):
595
num_lines = len(lines)
596
while cur < num_lines:
599
start, end, c = [int(n) for n in header.split(',')]
600
yield start, end, c, lines[cur:cur+c]
603
def parse_line_delta(self, lines, version_id):
604
return list(self.parse_line_delta_iter(lines, version_id))
606
def get_fulltext_content(self, lines):
607
"""Extract just the content lines from a fulltext."""
610
def get_linedelta_content(self, lines):
611
"""Extract just the content from a line delta.
613
This doesn't return all of the extra information stored in a delta.
614
Only the actual content lines.
619
header = header.split(',')
620
count = int(header[2])
621
for i in xrange(count):
624
def lower_fulltext(self, content):
625
return content.text()
627
def lower_line_delta(self, delta):
629
for start, end, c, lines in delta:
630
out.append('%d,%d,%d\n' % (start, end, c))
634
def annotate(self, knit, version_id):
635
annotator = _KnitAnnotator(knit)
636
return annotator.annotate(version_id)
639
def make_empty_knit(transport, relpath):
640
"""Construct a empty knit at the specified location."""
641
k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
644
def make_file_knit(name, transport, file_mode=None, access_mode='w',
645
factory=None, delta=True, create=False, create_parent_dir=False,
646
delay_create=False, dir_mode=None, get_scope=None):
647
"""Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
649
factory = KnitAnnotateFactory()
650
if get_scope is None:
651
get_scope = lambda:None
652
index = _KnitIndex(transport, name + INDEX_SUFFIX,
653
access_mode, create=create, file_mode=file_mode,
654
create_parent_dir=create_parent_dir, delay_create=delay_create,
655
dir_mode=dir_mode, get_scope=get_scope)
656
access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
657
dir_mode, ((create and not len(index)) and delay_create),
659
return KnitVersionedFile(name, transport, factory=factory,
660
create=create, delay_create=delay_create, index=index,
661
access_method=access)
665
"""Return the suffixes used by file based knits."""
666
return [DATA_SUFFIX, INDEX_SUFFIX]
667
make_file_knit.get_suffixes = get_suffixes
670
class KnitVersionedFile(VersionedFile):
671
"""Weave-like structure with faster random access.
673
A knit stores a number of texts and a summary of the relationships
674
between them. Texts are identified by a string version-id. Texts
675
are normally stored and retrieved as a series of lines, but can
676
also be passed as single strings.
678
Lines are stored with the trailing newline (if any) included, to
679
avoid special cases for files with no final newline. Lines are
680
composed of 8-bit characters, not unicode. The combination of
681
these approaches should mean any 'binary' file can be safely
682
stored and retrieved.
685
def __init__(self, relpath, transport, file_mode=None,
686
factory=None, delta=True, create=False, create_parent_dir=False,
687
delay_create=False, dir_mode=None, index=None, access_method=None):
688
"""Construct a knit at location specified by relpath.
690
:param create: If not True, only open an existing knit.
691
:param create_parent_dir: If True, create the parent directory if
692
creating the file fails. (This is used for stores with
693
hash-prefixes that may not exist yet)
694
:param delay_create: The calling code is aware that the knit won't
695
actually be created until the first data is stored.
696
:param index: An index to use for the knit.
698
super(KnitVersionedFile, self).__init__()
699
self.transport = transport
700
self.filename = relpath
701
self.factory = factory or KnitAnnotateFactory()
704
self._max_delta_chain = 200
706
if None in (access_method, index):
707
raise ValueError("No default access_method or index any more")
709
_access = access_method
710
if create and not len(self) and not delay_create:
712
self._data = _KnitData(_access)
715
return '%s(%s)' % (self.__class__.__name__,
716
self.transport.abspath(self.filename))
718
def _check_should_delta(self, first_parents):
719
"""Iterate back through the parent listing, looking for a fulltext.
721
This is used when we want to decide whether to add a delta or a new
722
fulltext. It searches for _max_delta_chain parents. When it finds a
723
fulltext parent, it sees if the total size of the deltas leading up to
724
it is large enough to indicate that we want a new full text anyway.
726
Return True if we should create a new delta, False if we should use a
731
delta_parents = first_parents
732
for count in xrange(self._max_delta_chain):
733
parent = delta_parents[0]
734
method = self._index.get_method(parent)
735
index, pos, size = self._index.get_position(parent)
736
if method == 'fulltext':
740
delta_parents = self._index.get_parent_map([parent])[parent]
742
# We couldn't find a fulltext, so we must create a new one
745
return fulltext_size > delta_size
747
def _check_write_ok(self):
748
return self._index._check_write_ok()
750
def _add_raw_records(self, records, data):
751
"""Add all the records 'records' with data pre-joined in 'data'.
753
:param records: A list of tuples(version_id, options, parents, size).
754
:param data: The data for the records. When it is written, the records
755
are adjusted to have pos pointing into data by the sum of
756
the preceding records sizes.
759
raw_record_sizes = [record[3] for record in records]
760
positions = self._data.add_raw_records(raw_record_sizes, data)
763
for (version_id, options, parents, size), access_memo in zip(
765
index_entries.append((version_id, options, access_memo, parents))
767
self._index.add_versions(index_entries)
769
def copy_to(self, name, transport):
770
"""See VersionedFile.copy_to()."""
771
# copy the current index to a temp index to avoid racing with local
773
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
774
self.transport.get(self._index._filename))
776
f = self._data._open_file()
778
transport.put_file(name + DATA_SUFFIX, f)
781
# move the copied index into place
782
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
784
def get_data_stream(self, required_versions):
785
"""Get a data stream for the specified versions.
787
Versions may be returned in any order, not necessarily the order
788
specified. They are returned in a partial order by compression
789
parent, so that the deltas can be applied as the data stream is
790
inserted; however note that compression parents will not be sent
791
unless they were specifically requested, as the client may already
794
:param required_versions: The exact set of versions to be extracted.
795
Unlike some other knit methods, this is not used to generate a
796
transitive closure, rather it is used precisely as given.
798
:returns: format_signature, list of (version, options, length, parents),
801
required_version_set = frozenset(required_versions)
803
# list of revisions that can just be sent without waiting for their
806
# map from revision to the children based on it
808
# first, read all relevant index data, enough to sort into the right
810
for version_id in required_versions:
811
options = self._index.get_options(version_id)
812
parents = self._index.get_parents_with_ghosts(version_id)
813
index_memo = self._index.get_position(version_id)
814
version_index[version_id] = (index_memo, options, parents)
815
if ('line-delta' in options
816
and parents[0] in required_version_set):
817
# must wait until the parent has been sent
818
deferred.setdefault(parents[0], []). \
821
# either a fulltext, or a delta whose parent the client did
822
# not ask for and presumably already has
823
ready_to_send.append(version_id)
824
# build a list of results to return, plus instructions for data to
826
copy_queue_records = []
827
temp_version_list = []
829
# XXX: pushing and popping lists may be a bit inefficient
830
version_id = ready_to_send.pop(0)
831
(index_memo, options, parents) = version_index[version_id]
832
copy_queue_records.append((version_id, index_memo))
833
none, data_pos, data_size = index_memo
834
temp_version_list.append((version_id, options, data_size,
836
if version_id in deferred:
837
# now we can send all the children of this revision - we could
838
# put them in anywhere, but we hope that sending them soon
839
# after the fulltext will give good locality in the receiver
840
ready_to_send[:0] = deferred.pop(version_id)
841
assert len(deferred) == 0, \
842
"Still have compressed child versions waiting to be sent"
843
# XXX: The stream format is such that we cannot stream it - we have to
844
# know the length of all the data a-priori.
846
result_version_list = []
847
for (version_id, raw_data, _), \
848
(version_id2, options, _, parents) in \
849
izip(self._data.read_records_iter_raw(copy_queue_records),
851
assert version_id == version_id2, \
852
'logic error, inconsistent results'
853
raw_datum.append(raw_data)
854
result_version_list.append(
855
(version_id, options, len(raw_data), parents))
856
# provide a callback to get data incrementally.
857
pseudo_file = StringIO(''.join(raw_datum))
860
return pseudo_file.read()
862
return pseudo_file.read(length)
863
return (self.get_format_signature(), result_version_list, read)
865
def get_record_stream(self, versions, ordering, include_delta_closure):
866
"""Get a stream of records for versions.
868
:param versions: The versions to include. Each version is a tuple
870
:param ordering: Either 'unordered' or 'topological'. A topologically
871
sorted stream has compression parents strictly before their
873
:param include_delta_closure: If True then the closure across any
874
compression parents will be included (in the opaque data).
875
:return: An iterator of ContentFactory objects, each of which is only
876
valid until the iterator is advanced.
878
if include_delta_closure:
879
# Nb: what we should do is plan the data to stream to allow
880
# reconstruction of all the texts without excessive buffering,
881
# including re-sending common bases as needed. This makes the most
882
# sense when we start serialising these streams though, so for now
883
# we just fallback to individual text construction behind the
884
# abstraction barrier.
888
# Double index lookups here : need a unified api ?
889
parent_map = self.get_parent_map(versions)
890
absent_versions = set(versions) - set(parent_map)
891
if ordering == 'topological':
892
present_versions = topo_sort(parent_map)
894
# List comprehension to keep the requested order (as that seems
895
# marginally useful, at least until we start doing IO optimising
897
present_versions = [version for version in versions if version in
899
position_map = self._get_components_positions(present_versions)
900
# c = component_id, r = record_details, i_m = index_memo, n = next
901
records = [(version, position_map[version][1]) for version in
904
for version in absent_versions:
905
yield AbsentContentFactory((version,))
906
for version, raw_data, sha1 in \
907
self._data.read_records_iter_raw(records):
908
(record_details, index_memo, _) = position_map[version]
909
yield KnitContentFactory(version, parent_map[version],
910
record_details, sha1, raw_data, self.factory.annotated, knit)
912
def _extract_blocks(self, version_id, source, target):
913
if self._index.get_method(version_id) != 'line-delta':
915
parent, sha1, noeol, delta = self.get_delta(version_id)
916
return KnitContent.get_line_delta_blocks(delta, source, target)
918
def get_delta(self, version_id):
919
"""Get a delta for constructing version from some other version."""
920
self.check_not_reserved_id(version_id)
921
parents = self.get_parent_map([version_id])[version_id]
926
index_memo = self._index.get_position(version_id)
927
data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
928
noeol = 'no-eol' in self._index.get_options(version_id)
929
if 'fulltext' == self._index.get_method(version_id):
930
new_content = self.factory.parse_fulltext(data, version_id)
931
if parent is not None:
932
reference_content = self._get_content(parent)
933
old_texts = reference_content.text()
936
new_texts = new_content.text()
937
delta_seq = patiencediff.PatienceSequenceMatcher(None, old_texts,
939
return parent, sha1, noeol, self._make_line_delta(delta_seq, new_content)
941
delta = self.factory.parse_line_delta(data, version_id)
942
return parent, sha1, noeol, delta
944
def get_format_signature(self):
945
"""See VersionedFile.get_format_signature()."""
946
if self.factory.annotated:
947
annotated_part = "annotated"
949
annotated_part = "plain"
950
return "knit-%s" % (annotated_part,)
952
@deprecated_method(one_four)
953
def get_graph_with_ghosts(self):
954
"""See VersionedFile.get_graph_with_ghosts()."""
955
return self.get_parent_map(self.versions())
957
def get_sha1s(self, version_ids):
958
"""See VersionedFile.get_sha1s()."""
959
record_map = self._get_record_map(version_ids)
960
# record entry 2 is the 'digest'.
961
return [record_map[v][2] for v in version_ids]
963
@deprecated_method(one_four)
964
def has_ghost(self, version_id):
965
"""True if there is a ghost reference in the file to version_id."""
967
if self.has_version(version_id):
969
# optimisable if needed by memoising the _ghosts set.
970
items = self.get_parent_map(self.versions())
971
for parents in items.itervalues():
972
for parent in parents:
973
if parent == version_id and parent not in items:
977
def insert_data_stream(self, (format, data_list, reader_callable)):
978
"""Insert knit records from a data stream into this knit.
980
If a version in the stream is already present in this knit, it will not
981
be inserted a second time. It will be checked for consistency with the
982
stored version however, and may cause a KnitCorrupt error to be raised
983
if the data in the stream disagrees with the already stored data.
985
:seealso: get_data_stream
987
if format != self.get_format_signature():
988
if 'knit' in debug.debug_flags:
990
'incompatible format signature inserting to %r', self)
991
source = self._knit_from_datastream(
992
(format, data_list, reader_callable))
993
stream = source.get_record_stream(source.versions(), 'unordered', False)
994
self.insert_record_stream(stream)
997
for version_id, options, length, parents in data_list:
998
if self.has_version(version_id):
999
# First check: the list of parents.
1000
my_parents = self.get_parents_with_ghosts(version_id)
1001
if tuple(my_parents) != tuple(parents):
1002
# XXX: KnitCorrupt is not quite the right exception here.
1005
'parents list %r from data stream does not match '
1006
'already recorded parents %r for %s'
1007
% (parents, my_parents, version_id))
1009
# Also check the SHA-1 of the fulltext this content will
1011
raw_data = reader_callable(length)
1012
my_fulltext_sha1 = self.get_sha1s([version_id])[0]
1013
df, rec = self._data._parse_record_header(version_id, raw_data)
1014
stream_fulltext_sha1 = rec[3]
1015
if my_fulltext_sha1 != stream_fulltext_sha1:
1016
# Actually, we don't know if it's this knit that's corrupt,
1017
# or the data stream we're trying to insert.
1019
self.filename, 'sha-1 does not match %s' % version_id)
1021
if 'line-delta' in options:
1022
# Make sure that this knit record is actually useful: a
1023
# line-delta is no use unless we have its parent.
1024
# Fetching from a broken repository with this problem
1025
# shouldn't break the target repository.
1027
# See https://bugs.launchpad.net/bzr/+bug/164443
1028
if not self._index.has_version(parents[0]):
1031
'line-delta from stream '
1034
'missing parent %s\n'
1035
'Try running "bzr check" '
1036
'on the source repository, and "bzr reconcile" '
1038
(version_id, parents[0]))
1040
# We received a line-delta record for a non-delta knit.
1041
# Convert it to a fulltext.
1042
gzip_bytes = reader_callable(length)
1043
lines, sha1 = self._data._parse_record(
1044
version_id, gzip_bytes)
1045
delta = self.factory.parse_line_delta(lines,
1047
content = self.factory.make(
1048
self.get_lines(parents[0]), parents[0])
1049
content.apply_delta(delta, version_id)
1050
digest, len, content = self.add_lines(
1051
version_id, parents, content.text())
1053
raise errors.VersionedFileInvalidChecksum(version)
1056
self._add_raw_records(
1057
[(version_id, options, parents, length)],
1058
reader_callable(length))
1060
def _knit_from_datastream(self, (format, data_list, reader_callable)):
1061
"""Create a knit object from a data stream.
1063
This method exists to allow conversion of data streams that do not
1064
match the signature of this knit. Generally it will be slower and use
1065
more memory to use this method to insert data, but it will work.
1067
:seealso: get_data_stream for details on datastreams.
1068
:return: A knit versioned file which can be used to join the datastream
1071
if format == "knit-plain":
1072
factory = KnitPlainFactory()
1073
elif format == "knit-annotated":
1074
factory = KnitAnnotateFactory()
1076
raise errors.KnitDataStreamUnknown(format)
1077
index = _StreamIndex(data_list, self._index)
1078
access = _StreamAccess(reader_callable, index, self, factory)
1079
return KnitVersionedFile(self.filename, self.transport,
1080
factory=factory, index=index, access_method=access)
1082
def insert_record_stream(self, stream):
1083
"""Insert a record stream into this versioned file.
1085
:param stream: A stream of records to insert.
1087
:seealso VersionedFile.get_record_stream:
1089
def get_adapter(adapter_key):
1091
return adapters[adapter_key]
1093
adapter_factory = adapter_registry.get(adapter_key)
1094
adapter = adapter_factory(self)
1095
adapters[adapter_key] = adapter
1097
if self.factory.annotated:
1098
# self is annotated, we need annotated knits to use directly.
1099
annotated = "annotated-"
1102
# self is not annotated, but we can strip annotations cheaply.
1104
convertibles = set(["knit-annotated-delta-gz",
1105
"knit-annotated-ft-gz"])
1106
native_types = set()
1107
native_types.add("knit-%sdelta-gz" % annotated)
1108
native_types.add("knit-%sft-gz" % annotated)
1109
knit_types = native_types.union(convertibles)
1111
for record in stream:
1112
# adapt to non-tuple interface
1113
parents = [parent[0] for parent in record.parents]
1114
if record.storage_kind in knit_types:
1115
if record.storage_kind not in native_types:
1117
adapter_key = (record.storage_kind, "knit-delta-gz")
1118
adapter = get_adapter(adapter_key)
1120
adapter_key = (record.storage_kind, "knit-ft-gz")
1121
adapter = get_adapter(adapter_key)
1122
bytes = adapter.get_bytes(
1123
record, record.get_bytes_as(record.storage_kind))
1125
bytes = record.get_bytes_as(record.storage_kind)
1126
options = [record._build_details[0]]
1127
if record._build_details[1]:
1128
options.append('no-eol')
1129
# Just blat it across.
1130
# Note: This does end up adding data on duplicate keys. As
1131
# modern repositories use atomic insertions this should not
1132
# lead to excessive growth in the event of interrupted fetches.
1133
# 'knit' repositories may suffer excessive growth, but as a
1134
# deprecated format this is tolerable. It can be fixed if
1135
# needed by in the kndx index support raising on a duplicate
1136
# add with identical parents and options.
1137
self._add_raw_records(
1138
[(record.key[0], options, parents, len(bytes))],
1140
elif record.storage_kind == 'fulltext':
1141
self.add_lines(record.key[0], parents,
1142
split_lines(record.get_bytes_as('fulltext')))
1144
adapter_key = record.storage_kind, 'fulltext'
1145
adapter = get_adapter(adapter_key)
1146
lines = split_lines(adapter.get_bytes(
1147
record, record.get_bytes_as(record.storage_kind)))
1149
self.add_lines(record.key[0], parents, lines)
1150
except errors.RevisionAlreadyPresent:
1154
"""See VersionedFile.versions."""
1155
if 'evil' in debug.debug_flags:
1156
trace.mutter_callsite(2, "versions scales with size of history")
1157
return self._index.get_versions()
1159
def has_version(self, version_id):
1160
"""See VersionedFile.has_version."""
1161
if 'evil' in debug.debug_flags:
1162
trace.mutter_callsite(2, "has_version is a LBYL scenario")
1163
return self._index.has_version(version_id)
1165
__contains__ = has_version
1167
def _merge_annotations(self, content, parents, parent_texts={},
1168
delta=None, annotated=None,
1169
left_matching_blocks=None):
1170
"""Merge annotations for content. This is done by comparing
1171
the annotations based on changed to the text.
1173
if left_matching_blocks is not None:
1174
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1178
for parent_id in parents:
1179
merge_content = self._get_content(parent_id, parent_texts)
1180
if (parent_id == parents[0] and delta_seq is not None):
1183
seq = patiencediff.PatienceSequenceMatcher(
1184
None, merge_content.text(), content.text())
1185
for i, j, n in seq.get_matching_blocks():
1188
# this appears to copy (origin, text) pairs across to the
1189
# new content for any line that matches the last-checked
1191
content._lines[j:j+n] = merge_content._lines[i:i+n]
1193
if delta_seq is None:
1194
reference_content = self._get_content(parents[0], parent_texts)
1195
new_texts = content.text()
1196
old_texts = reference_content.text()
1197
delta_seq = patiencediff.PatienceSequenceMatcher(
1198
None, old_texts, new_texts)
1199
return self._make_line_delta(delta_seq, content)
1201
def _make_line_delta(self, delta_seq, new_content):
1202
"""Generate a line delta from delta_seq and new_content."""
1204
for op in delta_seq.get_opcodes():
1205
if op[0] == 'equal':
1207
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1210
def _get_components_positions(self, version_ids):
1211
"""Produce a map of position data for the components of versions.
1213
This data is intended to be used for retrieving the knit records.
1215
A dict of version_id to (record_details, index_memo, next, parents) is
1217
method is the way referenced data should be applied.
1218
index_memo is the handle to pass to the data access to actually get the
1220
next is the build-parent of the version, or None for fulltexts.
1221
parents is the version_ids of the parents of this version
1224
pending_components = version_ids
1225
while pending_components:
1226
build_details = self._index.get_build_details(pending_components)
1227
current_components = set(pending_components)
1228
pending_components = set()
1229
for version_id, details in build_details.iteritems():
1230
(index_memo, compression_parent, parents,
1231
record_details) = details
1232
method = record_details[0]
1233
if compression_parent is not None:
1234
pending_components.add(compression_parent)
1235
component_data[version_id] = (record_details, index_memo,
1237
missing = current_components.difference(build_details)
1239
raise errors.RevisionNotPresent(missing.pop(), self.filename)
1240
return component_data
1242
def _get_content(self, version_id, parent_texts={}):
1243
"""Returns a content object that makes up the specified
1245
cached_version = parent_texts.get(version_id, None)
1246
if cached_version is not None:
1247
if not self.has_version(version_id):
1248
raise RevisionNotPresent(version_id, self.filename)
1249
return cached_version
1251
text_map, contents_map = self._get_content_maps([version_id])
1252
return contents_map[version_id]
1254
def _check_versions_present(self, version_ids):
1255
"""Check that all specified versions are present."""
1256
self._index.check_versions_present(version_ids)
1258
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts,
1259
nostore_sha, random_id, check_content, left_matching_blocks):
1260
"""See VersionedFile.add_lines_with_ghosts()."""
1261
self._check_add(version_id, lines, random_id, check_content)
1262
return self._add(version_id, lines, parents, self.delta,
1263
parent_texts, left_matching_blocks, nostore_sha, random_id)
1265
def _add_lines(self, version_id, parents, lines, parent_texts,
1266
left_matching_blocks, nostore_sha, random_id, check_content):
1267
"""See VersionedFile.add_lines."""
1268
self._check_add(version_id, lines, random_id, check_content)
1269
self._check_versions_present(parents)
1270
return self._add(version_id, lines[:], parents, self.delta,
1271
parent_texts, left_matching_blocks, nostore_sha, random_id)
1273
def _check_add(self, version_id, lines, random_id, check_content):
1274
"""check that version_id and lines are safe to add."""
1275
if contains_whitespace(version_id):
1276
raise InvalidRevisionId(version_id, self.filename)
1277
self.check_not_reserved_id(version_id)
1278
# Technically this could be avoided if we are happy to allow duplicate
1279
# id insertion when other things than bzr core insert texts, but it
1280
# seems useful for folk using the knit api directly to have some safety
1281
# blanket that we can disable.
1282
if not random_id and self.has_version(version_id):
1283
raise RevisionAlreadyPresent(version_id, self.filename)
1285
self._check_lines_not_unicode(lines)
1286
self._check_lines_are_lines(lines)
1288
def _add(self, version_id, lines, parents, delta, parent_texts,
1289
left_matching_blocks, nostore_sha, random_id):
1290
"""Add a set of lines on top of version specified by parents.
1292
If delta is true, compress the text as a line-delta against
1295
Any versions not present will be converted into ghosts.
1297
# first thing, if the content is something we don't need to store, find
1299
line_bytes = ''.join(lines)
1300
digest = sha_string(line_bytes)
1301
if nostore_sha == digest:
1302
raise errors.ExistingContent
1304
present_parents = []
1305
if parent_texts is None:
1307
for parent in parents:
1308
if self.has_version(parent):
1309
present_parents.append(parent)
1311
# can only compress against the left most present parent.
1313
(len(present_parents) == 0 or
1314
present_parents[0] != parents[0])):
1317
text_length = len(line_bytes)
1320
if lines[-1][-1] != '\n':
1321
# copy the contents of lines.
1323
options.append('no-eol')
1324
lines[-1] = lines[-1] + '\n'
1328
# To speed the extract of texts the delta chain is limited
1329
# to a fixed number of deltas. This should minimize both
1330
# I/O and the time spend applying deltas.
1331
delta = self._check_should_delta(present_parents)
1333
assert isinstance(version_id, str)
1334
content = self.factory.make(lines, version_id)
1335
if delta or (self.factory.annotated and len(present_parents) > 0):
1336
# Merge annotations from parent texts if needed.
1337
delta_hunks = self._merge_annotations(content, present_parents,
1338
parent_texts, delta, self.factory.annotated,
1339
left_matching_blocks)
1342
options.append('line-delta')
1343
store_lines = self.factory.lower_line_delta(delta_hunks)
1344
size, bytes = self._data._record_to_data(version_id, digest,
1347
options.append('fulltext')
1348
# isinstance is slower and we have no hierarchy.
1349
if self.factory.__class__ == KnitPlainFactory:
1350
# Use the already joined bytes saving iteration time in
1352
size, bytes = self._data._record_to_data(version_id, digest,
1353
lines, [line_bytes])
1355
# get mixed annotation + content and feed it into the
1357
store_lines = self.factory.lower_fulltext(content)
1358
size, bytes = self._data._record_to_data(version_id, digest,
1361
access_memo = self._data.add_raw_records([size], bytes)[0]
1362
self._index.add_versions(
1363
((version_id, options, access_memo, parents),),
1364
random_id=random_id)
1365
return digest, text_length, content
1367
def check(self, progress_bar=None):
1368
"""See VersionedFile.check()."""
1370
def get_lines(self, version_id):
1371
"""See VersionedFile.get_lines()."""
1372
return self.get_line_list([version_id])[0]
1374
def _get_record_map(self, version_ids):
1375
"""Produce a dictionary of knit records.
1377
:return: {version_id:(record, record_details, digest, next)}
1379
data returned from read_records
1381
opaque information to pass to parse_record
1383
SHA1 digest of the full text after all steps are done
1385
build-parent of the version, i.e. the leftmost ancestor.
1386
Will be None if the record is not a delta.
1388
position_map = self._get_components_positions(version_ids)
1389
# c = component_id, r = record_details, i_m = index_memo, n = next
1390
records = [(c, i_m) for c, (r, i_m, n)
1391
in position_map.iteritems()]
1393
for component_id, record, digest in \
1394
self._data.read_records_iter(records):
1395
(record_details, index_memo, next) = position_map[component_id]
1396
record_map[component_id] = record, record_details, digest, next
1400
def get_text(self, version_id):
1401
"""See VersionedFile.get_text"""
1402
return self.get_texts([version_id])[0]
1404
def get_texts(self, version_ids):
1405
return [''.join(l) for l in self.get_line_list(version_ids)]
1407
def get_line_list(self, version_ids):
1408
"""Return the texts of listed versions as a list of strings."""
1409
for version_id in version_ids:
1410
self.check_not_reserved_id(version_id)
1411
text_map, content_map = self._get_content_maps(version_ids)
1412
return [text_map[v] for v in version_ids]
1414
_get_lf_split_line_list = get_line_list
1416
def _get_content_maps(self, version_ids):
1417
"""Produce maps of text and KnitContents
1419
:return: (text_map, content_map) where text_map contains the texts for
1420
the requested versions and content_map contains the KnitContents.
1421
Both dicts take version_ids as their keys.
1423
# FUTURE: This function could be improved for the 'extract many' case
1424
# by tracking each component and only doing the copy when the number of
1425
# children than need to apply delta's to it is > 1 or it is part of the
1427
version_ids = list(version_ids)
1428
multiple_versions = len(version_ids) != 1
1429
record_map = self._get_record_map(version_ids)
1434
for version_id in version_ids:
1437
while cursor is not None:
1438
record, record_details, digest, next = record_map[cursor]
1439
components.append((cursor, record, record_details, digest))
1440
if cursor in content_map:
1445
for (component_id, record, record_details,
1446
digest) in reversed(components):
1447
if component_id in content_map:
1448
content = content_map[component_id]
1450
content, delta = self.factory.parse_record(version_id,
1451
record, record_details, content,
1452
copy_base_content=multiple_versions)
1453
if multiple_versions:
1454
content_map[component_id] = content
1456
content.cleanup_eol(copy_on_mutate=multiple_versions)
1457
final_content[version_id] = content
1459
# digest here is the digest from the last applied component.
1460
text = content.text()
1461
actual_sha = sha_strings(text)
1462
if actual_sha != digest:
1463
raise KnitCorrupt(self.filename,
1465
'\n of reconstructed text does not match'
1467
'\n for version %s' %
1468
(actual_sha, digest, version_id))
1469
text_map[version_id] = text
1470
return text_map, final_content
1472
def iter_lines_added_or_present_in_versions(self, version_ids=None,
1474
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
1475
if version_ids is None:
1476
version_ids = self.versions()
1478
pb = progress.DummyProgress()
1479
# we don't care about inclusions, the caller cares.
1480
# but we need to setup a list of records to visit.
1481
# we need version_id, position, length
1482
version_id_records = []
1483
requested_versions = set(version_ids)
1484
# filter for available versions
1485
for version_id in requested_versions:
1486
if not self.has_version(version_id):
1487
raise RevisionNotPresent(version_id, self.filename)
1488
# get a in-component-order queue:
1489
for version_id in self.versions():
1490
if version_id in requested_versions:
1491
index_memo = self._index.get_position(version_id)
1492
version_id_records.append((version_id, index_memo))
1494
total = len(version_id_records)
1495
for version_idx, (version_id, data, sha_value) in \
1496
enumerate(self._data.read_records_iter(version_id_records)):
1497
pb.update('Walking content.', version_idx, total)
1498
method = self._index.get_method(version_id)
1500
assert method in ('fulltext', 'line-delta')
1501
if method == 'fulltext':
1502
line_iterator = self.factory.get_fulltext_content(data)
1504
line_iterator = self.factory.get_linedelta_content(data)
1505
# XXX: It might be more efficient to yield (version_id,
1506
# line_iterator) in the future. However for now, this is a simpler
1507
# change to integrate into the rest of the codebase. RBC 20071110
1508
for line in line_iterator:
1509
yield line, version_id
1511
pb.update('Walking content.', total, total)
1513
def num_versions(self):
1514
"""See VersionedFile.num_versions()."""
1515
return self._index.num_versions()
1517
__len__ = num_versions
1519
def annotate(self, version_id):
1520
"""See VersionedFile.annotate."""
1521
return self.factory.annotate(self, version_id)
1523
def get_parent_map(self, version_ids):
1524
"""See VersionedFile.get_parent_map."""
1525
return self._index.get_parent_map(version_ids)
1527
def get_ancestry(self, versions, topo_sorted=True):
1528
"""See VersionedFile.get_ancestry."""
1529
if isinstance(versions, basestring):
1530
versions = [versions]
1533
return self._index.get_ancestry(versions, topo_sorted)
1535
def get_ancestry_with_ghosts(self, versions):
1536
"""See VersionedFile.get_ancestry_with_ghosts."""
1537
if isinstance(versions, basestring):
1538
versions = [versions]
1541
return self._index.get_ancestry_with_ghosts(versions)
1543
def plan_merge(self, ver_a, ver_b):
1544
"""See VersionedFile.plan_merge."""
1545
ancestors_b = set(self.get_ancestry(ver_b, topo_sorted=False))
1546
ancestors_a = set(self.get_ancestry(ver_a, topo_sorted=False))
1547
annotated_a = self.annotate(ver_a)
1548
annotated_b = self.annotate(ver_b)
1549
return merge._plan_annotate_merge(annotated_a, annotated_b,
1550
ancestors_a, ancestors_b)
1553
class _KnitComponentFile(object):
1554
"""One of the files used to implement a knit database"""
1556
def __init__(self, transport, filename, mode, file_mode=None,
1557
create_parent_dir=False, dir_mode=None):
1558
self._transport = transport
1559
self._filename = filename
1561
self._file_mode = file_mode
1562
self._dir_mode = dir_mode
1563
self._create_parent_dir = create_parent_dir
1564
self._need_to_create = False
1566
def _full_path(self):
1567
"""Return the full path to this file."""
1568
return self._transport.base + self._filename
1570
def check_header(self, fp):
1571
line = fp.readline()
1573
# An empty file can actually be treated as though the file doesn't
1575
raise errors.NoSuchFile(self._full_path())
1576
if line != self.HEADER:
1577
raise KnitHeaderError(badline=line,
1578
filename=self._transport.abspath(self._filename))
1581
return '%s(%s)' % (self.__class__.__name__, self._filename)
1584
class _KnitIndex(_KnitComponentFile):
1585
"""Manages knit index file.
1587
The index is already kept in memory and read on startup, to enable
1588
fast lookups of revision information. The cursor of the index
1589
file is always pointing to the end, making it easy to append
1592
_cache is a cache for fast mapping from version id to a Index
1595
_history is a cache for fast mapping from indexes to version ids.
1597
The index data format is dictionary compressed when it comes to
1598
parent references; a index entry may only have parents that with a
1599
lover index number. As a result, the index is topological sorted.
1601
Duplicate entries may be written to the index for a single version id
1602
if this is done then the latter one completely replaces the former:
1603
this allows updates to correct version and parent information.
1604
Note that the two entries may share the delta, and that successive
1605
annotations and references MUST point to the first entry.
1607
The index file on disc contains a header, followed by one line per knit
1608
record. The same revision can be present in an index file more than once.
1609
The first occurrence gets assigned a sequence number starting from 0.
1611
The format of a single line is
1612
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1613
REVISION_ID is a utf8-encoded revision id
1614
FLAGS is a comma separated list of flags about the record. Values include
1615
no-eol, line-delta, fulltext.
1616
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1617
that the the compressed data starts at.
1618
LENGTH is the ascii representation of the length of the data file.
1619
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1621
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1622
revision id already in the knit that is a parent of REVISION_ID.
1623
The ' :' marker is the end of record marker.
1626
when a write is interrupted to the index file, it will result in a line
1627
that does not end in ' :'. If the ' :' is not present at the end of a line,
1628
or at the end of the file, then the record that is missing it will be
1629
ignored by the parser.
1631
When writing new records to the index file, the data is preceded by '\n'
1632
to ensure that records always start on new lines even if the last write was
1633
interrupted. As a result its normal for the last line in the index to be
1634
missing a trailing newline. One can be added with no harmful effects.
1637
HEADER = "# bzr knit index 8\n"
1639
# speed of knit parsing went from 280 ms to 280 ms with slots addition.
1640
# __slots__ = ['_cache', '_history', '_transport', '_filename']
1642
def _cache_version(self, version_id, options, pos, size, parents):
1643
"""Cache a version record in the history array and index cache.
1645
This is inlined into _load_data for performance. KEEP IN SYNC.
1646
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1649
# only want the _history index to reference the 1st index entry
1651
if version_id not in self._cache:
1652
index = len(self._history)
1653
self._history.append(version_id)
1655
index = self._cache[version_id][5]
1656
self._cache[version_id] = (version_id,
1663
def _check_write_ok(self):
1664
if self._get_scope() != self._scope:
1665
raise errors.OutSideTransaction()
1666
if self._mode != 'w':
1667
raise errors.ReadOnlyObjectDirtiedError(self)
1669
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1670
create_parent_dir=False, delay_create=False, dir_mode=None,
1672
_KnitComponentFile.__init__(self, transport, filename, mode,
1673
file_mode=file_mode,
1674
create_parent_dir=create_parent_dir,
1677
# position in _history is the 'official' index for a revision
1678
# but the values may have come from a newer entry.
1679
# so - wc -l of a knit index is != the number of unique names
1683
fp = self._transport.get(self._filename)
1685
# _load_data may raise NoSuchFile if the target knit is
1687
_load_data(self, fp)
1691
if mode != 'w' or not create:
1694
self._need_to_create = True
1696
self._transport.put_bytes_non_atomic(
1697
self._filename, self.HEADER, mode=self._file_mode)
1698
self._scope = get_scope()
1699
self._get_scope = get_scope
1701
def get_ancestry(self, versions, topo_sorted=True):
1702
"""See VersionedFile.get_ancestry."""
1703
# get a graph of all the mentioned versions:
1705
pending = set(versions)
1708
version = pending.pop()
1711
parents = [p for p in cache[version][4] if p in cache]
1713
raise RevisionNotPresent(version, self._filename)
1714
# if not completed and not a ghost
1715
pending.update([p for p in parents if p not in graph])
1716
graph[version] = parents
1719
return topo_sort(graph.items())
1721
def get_ancestry_with_ghosts(self, versions):
1722
"""See VersionedFile.get_ancestry_with_ghosts."""
1723
# get a graph of all the mentioned versions:
1724
self.check_versions_present(versions)
1727
pending = set(versions)
1729
version = pending.pop()
1731
parents = cache[version][4]
1737
pending.update([p for p in parents if p not in graph])
1738
graph[version] = parents
1739
return topo_sort(graph.items())
1741
def get_build_details(self, version_ids):
1742
"""Get the method, index_memo and compression parent for version_ids.
1744
Ghosts are omitted from the result.
1746
:param version_ids: An iterable of version_ids.
1747
:return: A dict of version_id:(index_memo, compression_parent,
1748
parents, record_details).
1750
opaque structure to pass to read_records to extract the raw
1753
Content that this record is built upon, may be None
1755
Logical parents of this node
1757
extra information about the content which needs to be passed to
1758
Factory.parse_record
1761
for version_id in version_ids:
1762
if version_id not in self._cache:
1763
# ghosts are omitted
1765
method = self.get_method(version_id)
1766
parents = self.get_parents_with_ghosts(version_id)
1767
if method == 'fulltext':
1768
compression_parent = None
1770
compression_parent = parents[0]
1771
noeol = 'no-eol' in self.get_options(version_id)
1772
index_memo = self.get_position(version_id)
1773
result[version_id] = (index_memo, compression_parent,
1774
parents, (method, noeol))
1777
def num_versions(self):
1778
return len(self._history)
1780
__len__ = num_versions
1782
def get_versions(self):
1783
"""Get all the versions in the file. not topologically sorted."""
1784
return self._history
1786
def _version_list_to_index(self, versions):
1789
for version in versions:
1790
if version in cache:
1791
# -- inlined lookup() --
1792
result_list.append(str(cache[version][5]))
1793
# -- end lookup () --
1795
result_list.append('.' + version)
1796
return ' '.join(result_list)
1798
def add_version(self, version_id, options, index_memo, parents):
1799
"""Add a version record to the index."""
1800
self.add_versions(((version_id, options, index_memo, parents),))
1802
def add_versions(self, versions, random_id=False):
1803
"""Add multiple versions to the index.
1805
:param versions: a list of tuples:
1806
(version_id, options, pos, size, parents).
1807
:param random_id: If True the ids being added were randomly generated
1808
and no check for existence will be performed.
1811
orig_history = self._history[:]
1812
orig_cache = self._cache.copy()
1815
for version_id, options, (index, pos, size), parents in versions:
1816
line = "\n%s %s %s %s %s :" % (version_id,
1820
self._version_list_to_index(parents))
1821
assert isinstance(line, str), \
1822
'content must be utf-8 encoded: %r' % (line,)
1824
self._cache_version(version_id, options, pos, size, tuple(parents))
1825
if not self._need_to_create:
1826
self._transport.append_bytes(self._filename, ''.join(lines))
1829
sio.write(self.HEADER)
1830
sio.writelines(lines)
1832
self._transport.put_file_non_atomic(self._filename, sio,
1833
create_parent_dir=self._create_parent_dir,
1834
mode=self._file_mode,
1835
dir_mode=self._dir_mode)
1836
self._need_to_create = False
1838
# If any problems happen, restore the original values and re-raise
1839
self._history = orig_history
1840
self._cache = orig_cache
1843
def has_version(self, version_id):
1844
"""True if the version is in the index."""
1845
return version_id in self._cache
1847
def get_position(self, version_id):
1848
"""Return details needed to access the version.
1850
.kndx indices do not support split-out data, so return None for the
1853
:return: a tuple (None, data position, size) to hand to the access
1854
logic to get the record.
1856
entry = self._cache[version_id]
1857
return None, entry[2], entry[3]
1859
def get_method(self, version_id):
1860
"""Return compression method of specified version."""
1862
options = self._cache[version_id][1]
1864
raise RevisionNotPresent(version_id, self._filename)
1865
if 'fulltext' in options:
1868
if 'line-delta' not in options:
1869
raise errors.KnitIndexUnknownMethod(self._full_path(), options)
1872
def get_options(self, version_id):
1873
"""Return a list representing options.
1877
return self._cache[version_id][1]
1879
def get_parent_map(self, version_ids):
1880
"""Passed through to by KnitVersionedFile.get_parent_map."""
1882
for version_id in version_ids:
1884
result[version_id] = tuple(self._cache[version_id][4])
1889
def get_parents_with_ghosts(self, version_id):
1890
"""Return parents of specified version with ghosts."""
1892
return self.get_parent_map([version_id])[version_id]
1894
raise RevisionNotPresent(version_id, self)
1896
def check_versions_present(self, version_ids):
1897
"""Check that all specified versions are present."""
1899
for version_id in version_ids:
1900
if version_id not in cache:
1901
raise RevisionNotPresent(version_id, self._filename)
1904
class KnitGraphIndex(object):
1905
"""A knit index that builds on GraphIndex."""
1907
def __init__(self, graph_index, deltas=False, parents=True, add_callback=None):
1908
"""Construct a KnitGraphIndex on a graph_index.
1910
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1911
:param deltas: Allow delta-compressed records.
1912
:param add_callback: If not None, allow additions to the index and call
1913
this callback with a list of added GraphIndex nodes:
1914
[(node, value, node_refs), ...]
1915
:param parents: If True, record knits parents, if not do not record
1918
self._graph_index = graph_index
1919
self._deltas = deltas
1920
self._add_callback = add_callback
1921
self._parents = parents
1922
if deltas and not parents:
1923
raise KnitCorrupt(self, "Cannot do delta compression without "
1926
def _check_write_ok(self):
1929
def _get_entries(self, keys, check_present=False):
1930
"""Get the entries for keys.
1932
:param keys: An iterable of index keys, - 1-tuples.
1937
for node in self._graph_index.iter_entries(keys):
1939
found_keys.add(node[1])
1941
# adapt parentless index to the rest of the code.
1942
for node in self._graph_index.iter_entries(keys):
1943
yield node[0], node[1], node[2], ()
1944
found_keys.add(node[1])
1946
missing_keys = keys.difference(found_keys)
1948
raise RevisionNotPresent(missing_keys.pop(), self)
1950
def _present_keys(self, version_ids):
1952
node[1] for node in self._get_entries(version_ids)])
1954
def _parentless_ancestry(self, versions):
1955
"""Honour the get_ancestry API for parentless knit indices."""
1956
wanted_keys = self._version_ids_to_keys(versions)
1957
present_keys = self._present_keys(wanted_keys)
1958
missing = set(wanted_keys).difference(present_keys)
1960
raise RevisionNotPresent(missing.pop(), self)
1961
return list(self._keys_to_version_ids(present_keys))
1963
def get_ancestry(self, versions, topo_sorted=True):
1964
"""See VersionedFile.get_ancestry."""
1965
if not self._parents:
1966
return self._parentless_ancestry(versions)
1967
# XXX: This will do len(history) index calls - perhaps
1968
# it should be altered to be a index core feature?
1969
# get a graph of all the mentioned versions:
1972
versions = self._version_ids_to_keys(versions)
1973
pending = set(versions)
1975
# get all pending nodes
1976
this_iteration = pending
1977
new_nodes = self._get_entries(this_iteration)
1980
for (index, key, value, node_refs) in new_nodes:
1981
# dont ask for ghosties - otherwise
1982
# we we can end up looping with pending
1983
# being entirely ghosted.
1984
graph[key] = [parent for parent in node_refs[0]
1985
if parent not in ghosts]
1987
for parent in graph[key]:
1988
# dont examine known nodes again
1993
ghosts.update(this_iteration.difference(found))
1994
if versions.difference(graph):
1995
raise RevisionNotPresent(versions.difference(graph).pop(), self)
1997
result_keys = topo_sort(graph.items())
1999
result_keys = graph.iterkeys()
2000
return [key[0] for key in result_keys]
2002
def get_ancestry_with_ghosts(self, versions):
2003
"""See VersionedFile.get_ancestry."""
2004
if not self._parents:
2005
return self._parentless_ancestry(versions)
2006
# XXX: This will do len(history) index calls - perhaps
2007
# it should be altered to be a index core feature?
2008
# get a graph of all the mentioned versions:
2010
versions = self._version_ids_to_keys(versions)
2011
pending = set(versions)
2013
# get all pending nodes
2014
this_iteration = pending
2015
new_nodes = self._get_entries(this_iteration)
2017
for (index, key, value, node_refs) in new_nodes:
2018
graph[key] = node_refs[0]
2020
for parent in graph[key]:
2021
# dont examine known nodes again
2025
missing_versions = this_iteration.difference(graph)
2026
missing_needed = versions.intersection(missing_versions)
2028
raise RevisionNotPresent(missing_needed.pop(), self)
2029
for missing_version in missing_versions:
2030
# add a key, no parents
2031
graph[missing_version] = []
2032
pending.discard(missing_version) # don't look for it
2033
result_keys = topo_sort(graph.items())
2034
return [key[0] for key in result_keys]
2036
def get_build_details(self, version_ids):
2037
"""Get the method, index_memo and compression parent for version_ids.
2039
Ghosts are omitted from the result.
2041
:param version_ids: An iterable of version_ids.
2042
:return: A dict of version_id:(index_memo, compression_parent,
2043
parents, record_details).
2045
opaque structure to pass to read_records to extract the raw
2048
Content that this record is built upon, may be None
2050
Logical parents of this node
2052
extra information about the content which needs to be passed to
2053
Factory.parse_record
2056
entries = self._get_entries(self._version_ids_to_keys(version_ids), True)
2057
for entry in entries:
2058
version_id = self._keys_to_version_ids((entry[1],))[0]
2059
if not self._parents:
2062
parents = self._keys_to_version_ids(entry[3][0])
2063
if not self._deltas:
2064
compression_parent = None
2066
compression_parent_key = self._compression_parent(entry)
2067
if compression_parent_key:
2068
compression_parent = self._keys_to_version_ids(
2069
(compression_parent_key,))[0]
2071
compression_parent = None
2072
noeol = (entry[2][0] == 'N')
2073
if compression_parent:
2074
method = 'line-delta'
2077
result[version_id] = (self._node_to_position(entry),
2078
compression_parent, parents,
2082
def _compression_parent(self, an_entry):
2083
# return the key that an_entry is compressed against, or None
2084
# Grab the second parent list (as deltas implies parents currently)
2085
compression_parents = an_entry[3][1]
2086
if not compression_parents:
2088
assert len(compression_parents) == 1
2089
return compression_parents[0]
2091
def _get_method(self, node):
2092
if not self._deltas:
2094
if self._compression_parent(node):
2099
def num_versions(self):
2100
return len(list(self._graph_index.iter_all_entries()))
2102
__len__ = num_versions
2104
def get_versions(self):
2105
"""Get all the versions in the file. not topologically sorted."""
2106
return [node[1][0] for node in self._graph_index.iter_all_entries()]
2108
def has_version(self, version_id):
2109
"""True if the version is in the index."""
2110
return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
2112
def _keys_to_version_ids(self, keys):
2113
return tuple(key[0] for key in keys)
2115
def get_position(self, version_id):
2116
"""Return details needed to access the version.
2118
:return: a tuple (index, data position, size) to hand to the access
2119
logic to get the record.
2121
node = self._get_node(version_id)
2122
return self._node_to_position(node)
2124
def _node_to_position(self, node):
2125
"""Convert an index value to position details."""
2126
bits = node[2][1:].split(' ')
2127
return node[0], int(bits[0]), int(bits[1])
2129
def get_method(self, version_id):
2130
"""Return compression method of specified version."""
2131
return self._get_method(self._get_node(version_id))
2133
def _get_node(self, version_id):
2135
return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
2137
raise RevisionNotPresent(version_id, self)
2139
def get_options(self, version_id):
2140
"""Return a list representing options.
2144
node = self._get_node(version_id)
2145
options = [self._get_method(node)]
2146
if node[2][0] == 'N':
2147
options.append('no-eol')
2150
def get_parent_map(self, version_ids):
2151
"""Passed through to by KnitVersionedFile.get_parent_map."""
2152
nodes = self._get_entries(self._version_ids_to_keys(version_ids))
2156
result[node[1][0]] = self._keys_to_version_ids(node[3][0])
2159
result[node[1][0]] = ()
2162
def get_parents_with_ghosts(self, version_id):
2163
"""Return parents of specified version with ghosts."""
2165
return self.get_parent_map([version_id])[version_id]
2167
raise RevisionNotPresent(version_id, self)
2169
def check_versions_present(self, version_ids):
2170
"""Check that all specified versions are present."""
2171
keys = self._version_ids_to_keys(version_ids)
2172
present = self._present_keys(keys)
2173
missing = keys.difference(present)
2175
raise RevisionNotPresent(missing.pop(), self)
2177
def add_version(self, version_id, options, access_memo, parents):
2178
"""Add a version record to the index."""
2179
return self.add_versions(((version_id, options, access_memo, parents),))
2181
def add_versions(self, versions, random_id=False):
2182
"""Add multiple versions to the index.
2184
This function does not insert data into the Immutable GraphIndex
2185
backing the KnitGraphIndex, instead it prepares data for insertion by
2186
the caller and checks that it is safe to insert then calls
2187
self._add_callback with the prepared GraphIndex nodes.
2189
:param versions: a list of tuples:
2190
(version_id, options, pos, size, parents).
2191
:param random_id: If True the ids being added were randomly generated
2192
and no check for existence will be performed.
2194
if not self._add_callback:
2195
raise errors.ReadOnlyError(self)
2196
# we hope there are no repositories with inconsistent parentage
2201
for (version_id, options, access_memo, parents) in versions:
2202
index, pos, size = access_memo
2203
key = (version_id, )
2204
parents = tuple((parent, ) for parent in parents)
2205
if 'no-eol' in options:
2209
value += "%d %d" % (pos, size)
2210
if not self._deltas:
2211
if 'line-delta' in options:
2212
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2215
if 'line-delta' in options:
2216
node_refs = (parents, (parents[0],))
2218
node_refs = (parents, ())
2220
node_refs = (parents, )
2223
raise KnitCorrupt(self, "attempt to add node with parents "
2224
"in parentless index.")
2226
keys[key] = (value, node_refs)
2228
present_nodes = self._get_entries(keys)
2229
for (index, key, value, node_refs) in present_nodes:
2230
if (value, node_refs) != keys[key]:
2231
raise KnitCorrupt(self, "inconsistent details in add_versions"
2232
": %s %s" % ((value, node_refs), keys[key]))
2236
for key, (value, node_refs) in keys.iteritems():
2237
result.append((key, value, node_refs))
2239
for key, (value, node_refs) in keys.iteritems():
2240
result.append((key, value))
2241
self._add_callback(result)
2243
def _version_ids_to_keys(self, version_ids):
2244
return set((version_id, ) for version_id in version_ids)
2247
class _KnitAccess(object):
2248
"""Access to knit records in a .knit file."""
2250
def __init__(self, transport, filename, _file_mode, _dir_mode,
2251
_need_to_create, _create_parent_dir):
2252
"""Create a _KnitAccess for accessing and inserting data.
2254
:param transport: The transport the .knit is located on.
2255
:param filename: The filename of the .knit.
2257
self._transport = transport
2258
self._filename = filename
2259
self._file_mode = _file_mode
2260
self._dir_mode = _dir_mode
2261
self._need_to_create = _need_to_create
2262
self._create_parent_dir = _create_parent_dir
2264
def add_raw_records(self, sizes, raw_data):
2265
"""Add raw knit bytes to a storage area.
2267
The data is spooled to whereever the access method is storing data.
2269
:param sizes: An iterable containing the size of each raw data segment.
2270
:param raw_data: A bytestring containing the data.
2271
:return: A list of memos to retrieve the record later. Each memo is a
2272
tuple - (index, pos, length), where the index field is always None
2273
for the .knit access method.
2275
assert type(raw_data) == str, \
2276
'data must be plain bytes was %s' % type(raw_data)
2277
if not self._need_to_create:
2278
base = self._transport.append_bytes(self._filename, raw_data)
2280
self._transport.put_bytes_non_atomic(self._filename, raw_data,
2281
create_parent_dir=self._create_parent_dir,
2282
mode=self._file_mode,
2283
dir_mode=self._dir_mode)
2284
self._need_to_create = False
2288
result.append((None, base, size))
2293
"""IFF this data access has its own storage area, initialise it.
2297
self._transport.put_bytes_non_atomic(self._filename, '',
2298
mode=self._file_mode)
2300
def open_file(self):
2301
"""IFF this data access can be represented as a single file, open it.
2303
For knits that are not mapped to a single file on disk this will
2306
:return: None or a file handle.
2309
return self._transport.get(self._filename)
2314
def get_raw_records(self, memos_for_retrieval):
2315
"""Get the raw bytes for a records.
2317
:param memos_for_retrieval: An iterable containing the (index, pos,
2318
length) memo for retrieving the bytes. The .knit method ignores
2319
the index as there is always only a single file.
2320
:return: An iterator over the bytes of the records.
2322
read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
2323
for pos, data in self._transport.readv(self._filename, read_vector):
2327
class _PackAccess(object):
2328
"""Access to knit records via a collection of packs."""
2330
def __init__(self, index_to_packs, writer=None):
2331
"""Create a _PackAccess object.
2333
:param index_to_packs: A dict mapping index objects to the transport
2334
and file names for obtaining data.
2335
:param writer: A tuple (pack.ContainerWriter, write_index) which
2336
contains the pack to write, and the index that reads from it will
2340
self.container_writer = writer[0]
2341
self.write_index = writer[1]
2343
self.container_writer = None
2344
self.write_index = None
2345
self.indices = index_to_packs
2347
def add_raw_records(self, sizes, raw_data):
2348
"""Add raw knit bytes to a storage area.
2350
The data is spooled to the container writer in one bytes-record per
2353
:param sizes: An iterable containing the size of each raw data segment.
2354
:param raw_data: A bytestring containing the data.
2355
:return: A list of memos to retrieve the record later. Each memo is a
2356
tuple - (index, pos, length), where the index field is the
2357
write_index object supplied to the PackAccess object.
2359
assert type(raw_data) == str, \
2360
'data must be plain bytes was %s' % type(raw_data)
2364
p_offset, p_length = self.container_writer.add_bytes_record(
2365
raw_data[offset:offset+size], [])
2367
result.append((self.write_index, p_offset, p_length))
2371
"""Pack based knits do not get individually created."""
2373
def get_raw_records(self, memos_for_retrieval):
2374
"""Get the raw bytes for a records.
2376
:param memos_for_retrieval: An iterable containing the (index, pos,
2377
length) memo for retrieving the bytes. The Pack access method
2378
looks up the pack to use for a given record in its index_to_pack
2380
:return: An iterator over the bytes of the records.
2382
# first pass, group into same-index requests
2384
current_index = None
2385
for (index, offset, length) in memos_for_retrieval:
2386
if current_index == index:
2387
current_list.append((offset, length))
2389
if current_index is not None:
2390
request_lists.append((current_index, current_list))
2391
current_index = index
2392
current_list = [(offset, length)]
2393
# handle the last entry
2394
if current_index is not None:
2395
request_lists.append((current_index, current_list))
2396
for index, offsets in request_lists:
2397
transport, path = self.indices[index]
2398
reader = pack.make_readv_reader(transport, path, offsets)
2399
for names, read_func in reader.iter_records():
2400
yield read_func(None)
2402
def open_file(self):
2403
"""Pack based knits have no single file."""
2406
def set_writer(self, writer, index, (transport, packname)):
2407
"""Set a writer to use for adding data."""
2408
if index is not None:
2409
self.indices[index] = (transport, packname)
2410
self.container_writer = writer
2411
self.write_index = index
2414
class _StreamAccess(object):
2415
"""A Knit Access object that provides data from a datastream.
2417
It also provides a fallback to present as unannotated data, annotated data
2418
from a *backing* access object.
2420
This is triggered by a index_memo which is pointing to a different index
2421
than this was constructed with, and is used to allow extracting full
2422
unannotated texts for insertion into annotated knits.
2425
def __init__(self, reader_callable, stream_index, backing_knit,
2427
"""Create a _StreamAccess object.
2429
:param reader_callable: The reader_callable from the datastream.
2430
This is called to buffer all the data immediately, for
2432
:param stream_index: The index the data stream this provides access to
2433
which will be present in native index_memo's.
2434
:param backing_knit: The knit object that will provide access to
2435
annotated texts which are not available in the stream, so as to
2436
create unannotated texts.
2437
:param orig_factory: The original content factory used to generate the
2438
stream. This is used for checking whether the thunk code for
2439
supporting _copy_texts will generate the correct form of data.
2441
self.data = reader_callable(None)
2442
self.stream_index = stream_index
2443
self.backing_knit = backing_knit
2444
self.orig_factory = orig_factory
2446
def get_raw_records(self, memos_for_retrieval):
2447
"""Get the raw bytes for a records.
2449
:param memos_for_retrieval: An iterable of memos from the
2450
_StreamIndex object identifying bytes to read; for these classes
2451
they are (from_backing_knit, index, start, end) and can point to
2452
either the backing knit or streamed data.
2453
:return: An iterator yielding a byte string for each record in
2454
memos_for_retrieval.
2456
# use a generator for memory friendliness
2457
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2458
if not from_backing_knit:
2459
assert version_id is self.stream_index
2460
yield self.data[start:end]
2462
# we have been asked to thunk. This thunking only occurs when
2463
# we are obtaining plain texts from an annotated backing knit
2464
# so that _copy_texts will work.
2465
# We could improve performance here by scanning for where we need
2466
# to do this and using get_line_list, then interleaving the output
2467
# as desired. However, for now, this is sufficient.
2468
if self.orig_factory.__class__ != KnitPlainFactory:
2469
raise errors.KnitCorrupt(
2470
self, 'Bad thunk request %r cannot be backed by %r' %
2471
(version_id, self.orig_factory))
2472
lines = self.backing_knit.get_lines(version_id)
2473
line_bytes = ''.join(lines)
2474
digest = sha_string(line_bytes)
2475
# the packed form of the fulltext always has a trailing newline,
2476
# even if the actual text does not, unless the file is empty. the
2477
# record options including the noeol flag are passed through by
2478
# _StreamIndex, so this is safe.
2480
if lines[-1][-1] != '\n':
2481
lines[-1] = lines[-1] + '\n'
2483
# We want plain data, because we expect to thunk only to allow text
2485
size, bytes = self.backing_knit._data._record_to_data(version_id,
2486
digest, lines, line_bytes)
2490
class _StreamIndex(object):
2491
"""A Knit Index object that uses the data map from a datastream."""
2493
def __init__(self, data_list, backing_index):
2494
"""Create a _StreamIndex object.
2496
:param data_list: The data_list from the datastream.
2497
:param backing_index: The index which will supply values for nodes
2498
referenced outside of this stream.
2500
self.data_list = data_list
2501
self.backing_index = backing_index
2502
self._by_version = {}
2504
for key, options, length, parents in data_list:
2505
self._by_version[key] = options, (pos, pos + length), parents
2508
def get_ancestry(self, versions, topo_sorted):
2509
"""Get an ancestry list for versions."""
2511
# Not needed for basic joins
2512
raise NotImplementedError(self.get_ancestry)
2513
# get a graph of all the mentioned versions:
2514
# Little ugly - basically copied from KnitIndex, but don't want to
2515
# accidentally incorporate too much of that index's code.
2517
pending = set(versions)
2518
cache = self._by_version
2520
version = pending.pop()
2523
parents = [p for p in cache[version][2] if p in cache]
2525
raise RevisionNotPresent(version, self)
2526
# if not completed and not a ghost
2527
pending.update([p for p in parents if p not in ancestry])
2528
ancestry.add(version)
2529
return list(ancestry)
2531
def get_build_details(self, version_ids):
2532
"""Get the method, index_memo and compression parent for version_ids.
2534
Ghosts are omitted from the result.
2536
:param version_ids: An iterable of version_ids.
2537
:return: A dict of version_id:(index_memo, compression_parent,
2538
parents, record_details).
2540
opaque memo that can be passed to _StreamAccess.read_records
2541
to extract the raw data; for these classes it is
2542
(from_backing_knit, index, start, end)
2544
Content that this record is built upon, may be None
2546
Logical parents of this node
2548
extra information about the content which needs to be passed to
2549
Factory.parse_record
2552
for version_id in version_ids:
2554
method = self.get_method(version_id)
2555
except errors.RevisionNotPresent:
2556
# ghosts are omitted
2558
parent_ids = self.get_parents_with_ghosts(version_id)
2559
noeol = ('no-eol' in self.get_options(version_id))
2560
index_memo = self.get_position(version_id)
2561
from_backing_knit = index_memo[0]
2562
if from_backing_knit:
2563
# texts retrieved from the backing knit are always full texts
2565
if method == 'fulltext':
2566
compression_parent = None
2568
compression_parent = parent_ids[0]
2569
result[version_id] = (index_memo, compression_parent,
2570
parent_ids, (method, noeol))
2573
def get_method(self, version_id):
2574
"""Return compression method of specified version."""
2575
options = self.get_options(version_id)
2576
if 'fulltext' in options:
2578
elif 'line-delta' in options:
2581
raise errors.KnitIndexUnknownMethod(self, options)
2583
def get_options(self, version_id):
2584
"""Return a list representing options.
2589
return self._by_version[version_id][0]
2591
options = list(self.backing_index.get_options(version_id))
2592
if 'fulltext' in options:
2594
elif 'line-delta' in options:
2595
# Texts from the backing knit are always returned from the stream
2597
options.remove('line-delta')
2598
options.append('fulltext')
2600
raise errors.KnitIndexUnknownMethod(self, options)
2601
return tuple(options)
2603
def get_parent_map(self, version_ids):
2604
"""Passed through to by KnitVersionedFile.get_parent_map."""
2607
for version_id in version_ids:
2609
result[version_id] = self._by_version[version_id][2]
2611
pending_ids.add(version_id)
2612
result.update(self.backing_index.get_parent_map(pending_ids))
2615
def get_parents_with_ghosts(self, version_id):
2616
"""Return parents of specified version with ghosts."""
2618
return self.get_parent_map([version_id])[version_id]
2620
raise RevisionNotPresent(version_id, self)
2622
def get_position(self, version_id):
2623
"""Return details needed to access the version.
2625
_StreamAccess has the data as a big array, so we return slice
2626
coordinates into that (as index_memo's are opaque outside the
2627
index and matching access class).
2629
:return: a tuple (from_backing_knit, index, start, end) that can
2630
be passed e.g. to get_raw_records.
2631
If from_backing_knit is False, index will be self, otherwise it
2632
will be a version id.
2635
start, end = self._by_version[version_id][1]
2636
return False, self, start, end
2638
# Signal to the access object to handle this from the backing knit.
2639
return (True, version_id, None, None)
2641
def get_versions(self):
2642
"""Get all the versions in the stream."""
2643
return self._by_version.keys()
2646
class _KnitData(object):
2647
"""Manage extraction of data from a KnitAccess, caching and decompressing.
2649
The KnitData class provides the logic for parsing and using knit records,
2650
making use of an access method for the low level read and write operations.
2653
def __init__(self, access):
2654
"""Create a KnitData object.
2656
:param access: The access method to use. Access methods such as
2657
_KnitAccess manage the insertion of raw records and the subsequent
2658
retrieval of the same.
2660
self._access = access
2661
self._checked = False
2663
def _open_file(self):
2664
return self._access.open_file()
2666
def _record_to_data(self, version_id, digest, lines, dense_lines=None):
2667
"""Convert version_id, digest, lines into a raw data block.
2669
:param dense_lines: The bytes of lines but in a denser form. For
2670
instance, if lines is a list of 1000 bytestrings each ending in \n,
2671
dense_lines may be a list with one line in it, containing all the
2672
1000's lines and their \n's. Using dense_lines if it is already
2673
known is a win because the string join to create bytes in this
2674
function spends less time resizing the final string.
2675
:return: (len, a StringIO instance with the raw data ready to read.)
2677
# Note: using a string copy here increases memory pressure with e.g.
2678
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
2679
# when doing the initial commit of a mozilla tree. RBC 20070921
2680
bytes = ''.join(chain(
2681
["version %s %d %s\n" % (version_id,
2684
dense_lines or lines,
2685
["end %s\n" % version_id]))
2686
assert bytes.__class__ == str
2687
compressed_bytes = bytes_to_gzip(bytes)
2688
return len(compressed_bytes), compressed_bytes
2690
def add_raw_records(self, sizes, raw_data):
2691
"""Append a prepared record to the data file.
2693
:param sizes: An iterable containing the size of each raw data segment.
2694
:param raw_data: A bytestring containing the data.
2695
:return: a list of index data for the way the data was stored.
2696
See the access method add_raw_records documentation for more
2699
return self._access.add_raw_records(sizes, raw_data)
2701
def _parse_record_header(self, version_id, raw_data):
2702
"""Parse a record header for consistency.
2704
:return: the header and the decompressor stream.
2705
as (stream, header_record)
2707
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
2709
rec = self._check_header(version_id, df.readline())
2710
except Exception, e:
2711
raise KnitCorrupt(self._access,
2712
"While reading {%s} got %s(%s)"
2713
% (version_id, e.__class__.__name__, str(e)))
2716
def _split_header(self, line):
2719
raise KnitCorrupt(self._access,
2720
'unexpected number of elements in record header')
2723
def _check_header_version(self, rec, version_id):
2724
if rec[1] != version_id:
2725
raise KnitCorrupt(self._access,
2726
'unexpected version, wanted %r, got %r'
2727
% (version_id, rec[1]))
2729
def _check_header(self, version_id, line):
2730
rec = self._split_header(line)
2731
self._check_header_version(rec, version_id)
2734
def _parse_record_unchecked(self, data):
2736
# 4168 calls in 2880 217 internal
2737
# 4168 calls to _parse_record_header in 2121
2738
# 4168 calls to readlines in 330
2739
df = GzipFile(mode='rb', fileobj=StringIO(data))
2741
record_contents = df.readlines()
2742
except Exception, e:
2743
raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
2744
(data, e.__class__.__name__, str(e)))
2745
header = record_contents.pop(0)
2746
rec = self._split_header(header)
2747
last_line = record_contents.pop()
2748
if len(record_contents) != int(rec[2]):
2749
raise KnitCorrupt(self._access,
2750
'incorrect number of lines %s != %s'
2752
% (len(record_contents), int(rec[2]),
2754
if last_line != 'end %s\n' % rec[1]:
2755
raise KnitCorrupt(self._access,
2756
'unexpected version end line %r, wanted %r'
2757
% (last_line, rec[1]))
2759
return rec, record_contents
2761
def _parse_record(self, version_id, data):
2762
rec, record_contents = self._parse_record_unchecked(data)
2763
self._check_header_version(rec, version_id)
2764
return record_contents, rec[3]
2766
def read_records_iter_raw(self, records):
2767
"""Read text records from data file and yield raw data.
2769
This unpacks enough of the text record to validate the id is
2770
as expected but thats all.
2772
Each item the iterator yields is (version_id, bytes,
2775
# setup an iterator of the external records:
2776
# uses readv so nice and fast we hope.
2778
# grab the disk data needed.
2779
needed_offsets = [index_memo for version_id, index_memo
2781
raw_records = self._access.get_raw_records(needed_offsets)
2783
for version_id, index_memo in records:
2784
data = raw_records.next()
2785
# validate the header
2786
df, rec = self._parse_record_header(version_id, data)
2788
yield version_id, data, rec[3]
2790
def read_records_iter(self, records):
2791
"""Read text records from data file and yield result.
2793
The result will be returned in whatever is the fastest to read.
2794
Not by the order requested. Also, multiple requests for the same
2795
record will only yield 1 response.
2796
:param records: A list of (version_id, pos, len) entries
2797
:return: Yields (version_id, contents, digest) in the order
2798
read, not the order requested
2803
needed_records = sorted(set(records), key=operator.itemgetter(1))
2804
if not needed_records:
2807
# The transport optimizes the fetching as well
2808
# (ie, reads continuous ranges.)
2809
raw_data = self._access.get_raw_records(
2810
[index_memo for version_id, index_memo in needed_records])
2812
for (version_id, index_memo), data in \
2813
izip(iter(needed_records), raw_data):
2814
content, digest = self._parse_record(version_id, data)
2815
yield version_id, content, digest
2817
def read_records(self, records):
2818
"""Read records into a dictionary."""
2820
for record_id, content, digest in \
2821
self.read_records_iter(records):
2822
components[record_id] = (content, digest)
2826
class InterKnit(InterVersionedFile):
2827
"""Optimised code paths for knit to knit operations."""
2829
_matching_file_from_factory = staticmethod(make_file_knit)
2830
_matching_file_to_factory = staticmethod(make_file_knit)
2833
def is_compatible(source, target):
2834
"""Be compatible with knits. """
2836
return (isinstance(source, KnitVersionedFile) and
2837
isinstance(target, KnitVersionedFile))
2838
except AttributeError:
2841
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2842
"""Copy texts to the target by extracting and adding them one by one.
2844
see join() for the parameter definitions.
2846
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2847
# --- the below is factorable out with VersionedFile.join, but wait for
2848
# VersionedFiles, it may all be simpler then.
2849
graph = Graph(self.source)
2850
search = graph._make_breadth_first_searcher(version_ids)
2851
transitive_ids = set()
2852
map(transitive_ids.update, list(search))
2853
parent_map = self.source.get_parent_map(transitive_ids)
2854
order = topo_sort(parent_map.items())
2856
def size_of_content(content):
2857
return sum(len(line) for line in content.text())
2858
# Cache at most 10MB of parent texts
2859
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2860
compute_size=size_of_content)
2861
# TODO: jam 20071116 It would be nice to have a streaming interface to
2862
# get multiple texts from a source. The source could be smarter
2863
# about how it handled intermediate stages.
2864
# get_line_list() or make_mpdiffs() seem like a possibility, but
2865
# at the moment they extract all full texts into memory, which
2866
# causes us to store more than our 3x fulltext goal.
2867
# Repository.iter_files_bytes() may be another possibility
2868
to_process = [version for version in order
2869
if version not in self.target]
2870
total = len(to_process)
2871
pb = ui.ui_factory.nested_progress_bar()
2873
for index, version in enumerate(to_process):
2874
pb.update('Converting versioned data', index, total)
2875
sha1, num_bytes, parent_text = self.target.add_lines(version,
2876
self.source.get_parents_with_ghosts(version),
2877
self.source.get_lines(version),
2878
parent_texts=parent_cache)
2879
parent_cache[version] = parent_text
2884
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2885
"""See InterVersionedFile.join."""
2886
assert isinstance(self.source, KnitVersionedFile)
2887
assert isinstance(self.target, KnitVersionedFile)
2889
# If the source and target are mismatched w.r.t. annotations vs
2890
# plain, the data needs to be converted accordingly
2891
if self.source.factory.annotated == self.target.factory.annotated:
2893
elif self.source.factory.annotated:
2894
converter = self._anno_to_plain_converter
2896
# We're converting from a plain to an annotated knit. Copy them
2897
# across by full texts.
2898
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2900
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2904
pb = ui.ui_factory.nested_progress_bar()
2906
version_ids = list(version_ids)
2907
if None in version_ids:
2908
version_ids.remove(None)
2910
self.source_ancestry = set(self.source.get_ancestry(version_ids,
2912
this_versions = set(self.target._index.get_versions())
2913
# XXX: For efficiency we should not look at the whole index,
2914
# we only need to consider the referenced revisions - they
2915
# must all be present, or the method must be full-text.
2916
# TODO, RBC 20070919
2917
needed_versions = self.source_ancestry - this_versions
2919
if not needed_versions:
2921
full_list = topo_sort(
2922
self.source.get_parent_map(self.source.versions()))
2924
version_list = [i for i in full_list if (not self.target.has_version(i)
2925
and i in needed_versions)]
2929
copy_queue_records = []
2931
for version_id in version_list:
2932
options = self.source._index.get_options(version_id)
2933
parents = self.source._index.get_parents_with_ghosts(version_id)
2934
# check that its will be a consistent copy:
2935
for parent in parents:
2936
# if source has the parent, we must :
2937
# * already have it or
2938
# * have it scheduled already
2939
# otherwise we don't care
2940
assert (self.target.has_version(parent) or
2941
parent in copy_set or
2942
not self.source.has_version(parent))
2943
index_memo = self.source._index.get_position(version_id)
2944
copy_queue_records.append((version_id, index_memo))
2945
copy_queue.append((version_id, options, parents))
2946
copy_set.add(version_id)
2948
# data suck the join:
2950
total = len(version_list)
2953
for (version_id, raw_data, _), \
2954
(version_id2, options, parents) in \
2955
izip(self.source._data.read_records_iter_raw(copy_queue_records),
2957
assert version_id == version_id2, 'logic error, inconsistent results'
2959
pb.update("Joining knit", count, total)
2961
size, raw_data = converter(raw_data, version_id, options,
2964
size = len(raw_data)
2965
raw_records.append((version_id, options, parents, size))
2966
raw_datum.append(raw_data)
2967
self.target._add_raw_records(raw_records, ''.join(raw_datum))
2972
def _anno_to_plain_converter(self, raw_data, version_id, options,
2974
"""Convert annotated content to plain content."""
2975
data, digest = self.source._data._parse_record(version_id, raw_data)
2976
if 'fulltext' in options:
2977
content = self.source.factory.parse_fulltext(data, version_id)
2978
lines = self.target.factory.lower_fulltext(content)
2980
delta = self.source.factory.parse_line_delta(data, version_id,
2982
lines = self.target.factory.lower_line_delta(delta)
2983
return self.target._data._record_to_data(version_id, digest, lines)
2986
InterVersionedFile.register_optimiser(InterKnit)
2989
class WeaveToKnit(InterVersionedFile):
2990
"""Optimised code paths for weave to knit operations."""
2992
_matching_file_from_factory = bzrlib.weave.WeaveFile
2993
_matching_file_to_factory = staticmethod(make_file_knit)
2996
def is_compatible(source, target):
2997
"""Be compatible with weaves to knits."""
2999
return (isinstance(source, bzrlib.weave.Weave) and
3000
isinstance(target, KnitVersionedFile))
3001
except AttributeError:
3004
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
3005
"""See InterVersionedFile.join."""
3006
assert isinstance(self.source, bzrlib.weave.Weave)
3007
assert isinstance(self.target, KnitVersionedFile)
3009
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
3014
pb = ui.ui_factory.nested_progress_bar()
3016
version_ids = list(version_ids)
3018
self.source_ancestry = set(self.source.get_ancestry(version_ids))
3019
this_versions = set(self.target._index.get_versions())
3020
needed_versions = self.source_ancestry - this_versions
3022
if not needed_versions:
3024
full_list = topo_sort(
3025
self.source.get_parent_map(self.source.versions()))
3027
version_list = [i for i in full_list if (not self.target.has_version(i)
3028
and i in needed_versions)]
3032
total = len(version_list)
3033
parent_map = self.source.get_parent_map(version_list)
3034
for version_id in version_list:
3035
pb.update("Converting to knit", count, total)
3036
parents = parent_map[version_id]
3037
# check that its will be a consistent copy:
3038
for parent in parents:
3039
# if source has the parent, we must already have it
3040
assert (self.target.has_version(parent))
3041
self.target.add_lines(
3042
version_id, parents, self.source.get_lines(version_id))
3049
InterVersionedFile.register_optimiser(WeaveToKnit)
3052
# Deprecated, use PatienceSequenceMatcher instead
3053
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3056
def annotate_knit(knit, revision_id):
3057
"""Annotate a knit with no cached annotations.
3059
This implementation is for knits with no cached annotations.
3060
It will work for knits with cached annotations, but this is not
3063
annotator = _KnitAnnotator(knit)
3064
return iter(annotator.annotate(revision_id))
3067
class _KnitAnnotator(object):
3068
"""Build up the annotations for a text."""
3070
def __init__(self, knit):
3073
# Content objects, differs from fulltexts because of how final newlines
3074
# are treated by knits. the content objects here will always have a
3076
self._fulltext_contents = {}
3078
# Annotated lines of specific revisions
3079
self._annotated_lines = {}
3081
# Track the raw data for nodes that we could not process yet.
3082
# This maps the revision_id of the base to a list of children that will
3083
# annotated from it.
3084
self._pending_children = {}
3086
# Nodes which cannot be extracted
3087
self._ghosts = set()
3089
# Track how many children this node has, so we know if we need to keep
3091
self._annotate_children = {}
3092
self._compression_children = {}
3094
self._all_build_details = {}
3095
# The children => parent revision_id graph
3096
self._revision_id_graph = {}
3098
self._heads_provider = None
3100
self._nodes_to_keep_annotations = set()
3101
self._generations_until_keep = 100
3103
def set_generations_until_keep(self, value):
3104
"""Set the number of generations before caching a node.
3106
Setting this to -1 will cache every merge node, setting this higher
3107
will cache fewer nodes.
3109
self._generations_until_keep = value
3111
def _add_fulltext_content(self, revision_id, content_obj):
3112
self._fulltext_contents[revision_id] = content_obj
3113
# TODO: jam 20080305 It might be good to check the sha1digest here
3114
return content_obj.text()
3116
def _check_parents(self, child, nodes_to_annotate):
3117
"""Check if all parents have been processed.
3119
:param child: A tuple of (rev_id, parents, raw_content)
3120
:param nodes_to_annotate: If child is ready, add it to
3121
nodes_to_annotate, otherwise put it back in self._pending_children
3123
for parent_id in child[1]:
3124
if (parent_id not in self._annotated_lines):
3125
# This parent is present, but another parent is missing
3126
self._pending_children.setdefault(parent_id,
3130
# This one is ready to be processed
3131
nodes_to_annotate.append(child)
3133
def _add_annotation(self, revision_id, fulltext, parent_ids,
3134
left_matching_blocks=None):
3135
"""Add an annotation entry.
3137
All parents should already have been annotated.
3138
:return: A list of children that now have their parents satisfied.
3140
a = self._annotated_lines
3141
annotated_parent_lines = [a[p] for p in parent_ids]
3142
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
3143
fulltext, revision_id, left_matching_blocks,
3144
heads_provider=self._get_heads_provider()))
3145
self._annotated_lines[revision_id] = annotated_lines
3146
for p in parent_ids:
3147
ann_children = self._annotate_children[p]
3148
ann_children.remove(revision_id)
3149
if (not ann_children
3150
and p not in self._nodes_to_keep_annotations):
3151
del self._annotated_lines[p]
3152
del self._all_build_details[p]
3153
if p in self._fulltext_contents:
3154
del self._fulltext_contents[p]
3155
# Now that we've added this one, see if there are any pending
3156
# deltas to be done, certainly this parent is finished
3157
nodes_to_annotate = []
3158
for child in self._pending_children.pop(revision_id, []):
3159
self._check_parents(child, nodes_to_annotate)
3160
return nodes_to_annotate
3162
def _get_build_graph(self, revision_id):
3163
"""Get the graphs for building texts and annotations.
3165
The data you need for creating a full text may be different than the
3166
data you need to annotate that text. (At a minimum, you need both
3167
parents to create an annotation, but only need 1 parent to generate the
3170
:return: A list of (revision_id, index_memo) records, suitable for
3171
passing to read_records_iter to start reading in the raw data fro/
3174
if revision_id in self._annotated_lines:
3177
pending = set([revision_id])
3182
# get all pending nodes
3184
this_iteration = pending
3185
build_details = self._knit._index.get_build_details(this_iteration)
3186
self._all_build_details.update(build_details)
3187
# new_nodes = self._knit._index._get_entries(this_iteration)
3189
for rev_id, details in build_details.iteritems():
3190
(index_memo, compression_parent, parents,
3191
record_details) = details
3192
self._revision_id_graph[rev_id] = parents
3193
records.append((rev_id, index_memo))
3194
# Do we actually need to check _annotated_lines?
3195
pending.update(p for p in parents
3196
if p not in self._all_build_details)
3197
if compression_parent:
3198
self._compression_children.setdefault(compression_parent,
3201
for parent in parents:
3202
self._annotate_children.setdefault(parent,
3204
num_gens = generation - kept_generation
3205
if ((num_gens >= self._generations_until_keep)
3206
and len(parents) > 1):
3207
kept_generation = generation
3208
self._nodes_to_keep_annotations.add(rev_id)
3210
missing_versions = this_iteration.difference(build_details.keys())
3211
self._ghosts.update(missing_versions)
3212
for missing_version in missing_versions:
3213
# add a key, no parents
3214
self._revision_id_graph[missing_version] = ()
3215
pending.discard(missing_version) # don't look for it
3216
# XXX: This should probably be a real exception, as it is a data
3218
assert not self._ghosts.intersection(self._compression_children), \
3219
"We cannot have nodes which have a compression parent of a ghost."
3220
# Cleanout anything that depends on a ghost so that we don't wait for
3221
# the ghost to show up
3222
for node in self._ghosts:
3223
if node in self._annotate_children:
3224
# We won't be building this node
3225
del self._annotate_children[node]
3226
# Generally we will want to read the records in reverse order, because
3227
# we find the parent nodes after the children
3231
def _annotate_records(self, records):
3232
"""Build the annotations for the listed records."""
3233
# We iterate in the order read, rather than a strict order requested
3234
# However, process what we can, and put off to the side things that
3235
# still need parents, cleaning them up when those parents are
3237
for (rev_id, record,
3238
digest) in self._knit._data.read_records_iter(records):
3239
if rev_id in self._annotated_lines:
3241
parent_ids = self._revision_id_graph[rev_id]
3242
parent_ids = [p for p in parent_ids if p not in self._ghosts]
3243
details = self._all_build_details[rev_id]
3244
(index_memo, compression_parent, parents,
3245
record_details) = details
3246
nodes_to_annotate = []
3247
# TODO: Remove the punning between compression parents, and
3248
# parent_ids, we should be able to do this without assuming
3250
if len(parent_ids) == 0:
3251
# There are no parents for this node, so just add it
3252
# TODO: This probably needs to be decoupled
3253
assert compression_parent is None
3254
fulltext_content, delta = self._knit.factory.parse_record(
3255
rev_id, record, record_details, None)
3256
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3257
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3258
parent_ids, left_matching_blocks=None))
3260
child = (rev_id, parent_ids, record)
3261
# Check if all the parents are present
3262
self._check_parents(child, nodes_to_annotate)
3263
while nodes_to_annotate:
3264
# Should we use a queue here instead of a stack?
3265
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
3266
(index_memo, compression_parent, parents,
3267
record_details) = self._all_build_details[rev_id]
3268
if compression_parent is not None:
3269
comp_children = self._compression_children[compression_parent]
3270
assert rev_id in comp_children
3271
# If there is only 1 child, it is safe to reuse this
3273
reuse_content = (len(comp_children) == 1
3274
and compression_parent not in
3275
self._nodes_to_keep_annotations)
3277
# Remove it from the cache since it will be changing
3278
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3279
# Make sure to copy the fulltext since it might be
3281
parent_fulltext = list(parent_fulltext_content.text())
3283
parent_fulltext_content = self._fulltext_contents[compression_parent]
3284
parent_fulltext = parent_fulltext_content.text()
3285
comp_children.remove(rev_id)
3286
fulltext_content, delta = self._knit.factory.parse_record(
3287
rev_id, record, record_details,
3288
parent_fulltext_content,
3289
copy_base_content=(not reuse_content))
3290
fulltext = self._add_fulltext_content(rev_id,
3292
blocks = KnitContent.get_line_delta_blocks(delta,
3293
parent_fulltext, fulltext)
3295
fulltext_content = self._knit.factory.parse_fulltext(
3297
fulltext = self._add_fulltext_content(rev_id,
3300
nodes_to_annotate.extend(
3301
self._add_annotation(rev_id, fulltext, parent_ids,
3302
left_matching_blocks=blocks))
3304
def _get_heads_provider(self):
3305
"""Create a heads provider for resolving ancestry issues."""
3306
if self._heads_provider is not None:
3307
return self._heads_provider
3308
parent_provider = _mod_graph.DictParentsProvider(
3309
self._revision_id_graph)
3310
graph_obj = _mod_graph.Graph(parent_provider)
3311
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
3312
self._heads_provider = head_cache
3315
def annotate(self, revision_id):
3316
"""Return the annotated fulltext at the given revision.
3318
:param revision_id: The revision id for this file
3320
records = self._get_build_graph(revision_id)
3321
if revision_id in self._ghosts:
3322
raise errors.RevisionNotPresent(revision_id, self._knit)
3323
self._annotate_records(records)
3324
return self._annotated_lines[revision_id]
3328
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3330
from bzrlib._knit_load_data_py import _load_data_py as _load_data