1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
64
from cStringIO import StringIO
65
from itertools import izip, chain
70
from zlib import Z_DEFAULT_COMPRESSION
73
from bzrlib.lazy_import import lazy_import
74
lazy_import(globals(), """
94
from bzrlib.errors import (
102
RevisionAlreadyPresent,
104
from bzrlib.graph import Graph
105
from bzrlib.osutils import (
112
from bzrlib.tsort import topo_sort
113
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
115
from bzrlib.versionedfile import (
116
AbsentContentFactory,
125
# TODO: Split out code specific to this format into an associated object.
127
# TODO: Can we put in some kind of value to check that the index and data
128
# files belong together?
130
# TODO: accommodate binaries, perhaps by storing a byte count
132
# TODO: function to check whole file
134
# TODO: atomically append data, then measure backwards from the cursor
135
# position after writing to work out where it was located. we may need to
136
# bypass python file buffering.
138
DATA_SUFFIX = '.knit'
139
INDEX_SUFFIX = '.kndx'
142
class KnitAdapter(object):
143
"""Base class for knit record adaption."""
145
def __init__(self, basis_vf):
146
"""Create an adapter which accesses full texts from basis_vf.
148
:param basis_vf: A versioned file to access basis texts of deltas from.
149
May be None for adapters that do not need to access basis texts.
151
self._data = _KnitData(None)
152
self._annotate_factory = KnitAnnotateFactory()
153
self._plain_factory = KnitPlainFactory()
154
self._basis_vf = basis_vf
157
class FTAnnotatedToUnannotated(KnitAdapter):
158
"""An adapter from FT annotated knits to unannotated ones."""
160
def get_bytes(self, factory, annotated_compressed_bytes):
162
self._data._parse_record_unchecked(annotated_compressed_bytes)
163
content = self._annotate_factory.parse_fulltext(contents, rec[1])
164
size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
168
class DeltaAnnotatedToUnannotated(KnitAdapter):
169
"""An adapter for deltas from annotated to unannotated."""
171
def get_bytes(self, factory, annotated_compressed_bytes):
173
self._data._parse_record_unchecked(annotated_compressed_bytes)
174
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
176
contents = self._plain_factory.lower_line_delta(delta)
177
size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
181
class FTAnnotatedToFullText(KnitAdapter):
182
"""An adapter from FT annotated knits to unannotated ones."""
184
def get_bytes(self, factory, annotated_compressed_bytes):
186
self._data._parse_record_unchecked(annotated_compressed_bytes)
187
content, delta = self._annotate_factory.parse_record(factory.key[0],
188
contents, factory._build_details, None)
189
return ''.join(content.text())
192
class DeltaAnnotatedToFullText(KnitAdapter):
193
"""An adapter for deltas from annotated to unannotated."""
195
def get_bytes(self, factory, annotated_compressed_bytes):
197
self._data._parse_record_unchecked(annotated_compressed_bytes)
198
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
200
compression_parent = factory.parents[0][0]
201
basis_lines = self._basis_vf.get_lines(compression_parent)
202
# Manually apply the delta because we have one annotated content and
204
basis_content = PlainKnitContent(basis_lines, compression_parent)
205
basis_content.apply_delta(delta, rec[1])
206
basis_content._should_strip_eol = factory._build_details[1]
207
return ''.join(basis_content.text())
210
class FTPlainToFullText(KnitAdapter):
211
"""An adapter from FT plain knits to unannotated ones."""
213
def get_bytes(self, factory, compressed_bytes):
215
self._data._parse_record_unchecked(compressed_bytes)
216
content, delta = self._plain_factory.parse_record(factory.key[0],
217
contents, factory._build_details, None)
218
return ''.join(content.text())
221
class DeltaPlainToFullText(KnitAdapter):
222
"""An adapter for deltas from annotated to unannotated."""
224
def get_bytes(self, factory, compressed_bytes):
226
self._data._parse_record_unchecked(compressed_bytes)
227
delta = self._plain_factory.parse_line_delta(contents, rec[1])
228
compression_parent = factory.parents[0][0]
229
basis_lines = self._basis_vf.get_lines(compression_parent)
230
basis_content = PlainKnitContent(basis_lines, compression_parent)
231
# Manually apply the delta because we have one annotated content and
233
content, _ = self._plain_factory.parse_record(rec[1], contents,
234
factory._build_details, basis_content)
235
return ''.join(content.text())
238
class KnitContentFactory(ContentFactory):
239
"""Content factory for streaming from knits.
241
:seealso ContentFactory:
244
def __init__(self, version, parents, build_details, sha1, raw_record,
245
annotated, knit=None):
246
"""Create a KnitContentFactory for version.
248
:param version: The version.
249
:param parents: The parents.
250
:param build_details: The build details as returned from
252
:param sha1: The sha1 expected from the full text of this object.
253
:param raw_record: The bytes of the knit data from disk.
254
:param annotated: True if the raw data is annotated.
256
ContentFactory.__init__(self)
258
self.key = (version,)
259
self.parents = tuple((parent,) for parent in parents)
260
if build_details[0] == 'line-delta':
265
annotated_kind = 'annotated-'
268
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
269
self._raw_record = raw_record
270
self._build_details = build_details
273
def get_bytes_as(self, storage_kind):
274
if storage_kind == self.storage_kind:
275
return self._raw_record
276
if storage_kind == 'fulltext' and self._knit is not None:
277
return self._knit.get_text(self.key[0])
279
raise errors.UnavailableRepresentation(self.key, storage_kind,
283
class KnitContent(object):
284
"""Content of a knit version to which deltas can be applied."""
287
self._should_strip_eol = False
289
def apply_delta(self, delta, new_version_id):
290
"""Apply delta to this object to become new_version_id."""
291
raise NotImplementedError(self.apply_delta)
293
def cleanup_eol(self, copy_on_mutate=True):
294
if self._should_strip_eol:
296
self._lines = self._lines[:]
297
self.strip_last_line_newline()
299
def line_delta_iter(self, new_lines):
300
"""Generate line-based delta from this content to new_lines."""
301
new_texts = new_lines.text()
302
old_texts = self.text()
303
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
304
for tag, i1, i2, j1, j2 in s.get_opcodes():
307
# ofrom, oto, length, data
308
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
310
def line_delta(self, new_lines):
311
return list(self.line_delta_iter(new_lines))
314
def get_line_delta_blocks(knit_delta, source, target):
315
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
316
target_len = len(target)
319
for s_begin, s_end, t_len, new_text in knit_delta:
320
true_n = s_begin - s_pos
323
# knit deltas do not provide reliable info about whether the
324
# last line of a file matches, due to eol handling.
325
if source[s_pos + n -1] != target[t_pos + n -1]:
328
yield s_pos, t_pos, n
329
t_pos += t_len + true_n
331
n = target_len - t_pos
333
if source[s_pos + n -1] != target[t_pos + n -1]:
336
yield s_pos, t_pos, n
337
yield s_pos + (target_len - t_pos), target_len, 0
340
class AnnotatedKnitContent(KnitContent):
341
"""Annotated content."""
343
def __init__(self, lines):
344
KnitContent.__init__(self)
348
"""Return a list of (origin, text) for each content line."""
349
return list(self._lines)
351
def apply_delta(self, delta, new_version_id):
352
"""Apply delta to this object to become new_version_id."""
355
for start, end, count, delta_lines in delta:
356
lines[offset+start:offset+end] = delta_lines
357
offset = offset + (start - end) + count
359
def strip_last_line_newline(self):
360
line = self._lines[-1][1].rstrip('\n')
361
self._lines[-1] = (self._lines[-1][0], line)
362
self._should_strip_eol = False
366
lines = [text for origin, text in self._lines]
367
except ValueError, e:
368
# most commonly (only?) caused by the internal form of the knit
369
# missing annotation information because of a bug - see thread
371
raise KnitCorrupt(self,
372
"line in annotated knit missing annotation information: %s"
375
if self._should_strip_eol:
376
lines[-1] = lines[-1].rstrip('\n')
380
return AnnotatedKnitContent(self._lines[:])
383
class PlainKnitContent(KnitContent):
384
"""Unannotated content.
386
When annotate[_iter] is called on this content, the same version is reported
387
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
391
def __init__(self, lines, version_id):
392
KnitContent.__init__(self)
394
self._version_id = version_id
397
"""Return a list of (origin, text) for each content line."""
398
return [(self._version_id, line) for line in self._lines]
400
def apply_delta(self, delta, new_version_id):
401
"""Apply delta to this object to become new_version_id."""
404
for start, end, count, delta_lines in delta:
405
lines[offset+start:offset+end] = delta_lines
406
offset = offset + (start - end) + count
407
self._version_id = new_version_id
410
return PlainKnitContent(self._lines[:], self._version_id)
412
def strip_last_line_newline(self):
413
self._lines[-1] = self._lines[-1].rstrip('\n')
414
self._should_strip_eol = False
418
if self._should_strip_eol:
420
lines[-1] = lines[-1].rstrip('\n')
424
class _KnitFactory(object):
425
"""Base class for common Factory functions."""
427
def parse_record(self, version_id, record, record_details,
428
base_content, copy_base_content=True):
429
"""Parse a record into a full content object.
431
:param version_id: The official version id for this content
432
:param record: The data returned by read_records_iter()
433
:param record_details: Details about the record returned by
435
:param base_content: If get_build_details returns a compression_parent,
436
you must return a base_content here, else use None
437
:param copy_base_content: When building from the base_content, decide
438
you can either copy it and return a new object, or modify it in
440
:return: (content, delta) A Content object and possibly a line-delta,
443
method, noeol = record_details
444
if method == 'line-delta':
445
assert base_content is not None
446
if copy_base_content:
447
content = base_content.copy()
449
content = base_content
450
delta = self.parse_line_delta(record, version_id)
451
content.apply_delta(delta, version_id)
453
content = self.parse_fulltext(record, version_id)
455
content._should_strip_eol = noeol
456
return (content, delta)
459
class KnitAnnotateFactory(_KnitFactory):
460
"""Factory for creating annotated Content objects."""
464
def make(self, lines, version_id):
465
num_lines = len(lines)
466
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
468
def parse_fulltext(self, content, version_id):
469
"""Convert fulltext to internal representation
471
fulltext content is of the format
472
revid(utf8) plaintext\n
473
internal representation is of the format:
476
# TODO: jam 20070209 The tests expect this to be returned as tuples,
477
# but the code itself doesn't really depend on that.
478
# Figure out a way to not require the overhead of turning the
479
# list back into tuples.
480
lines = [tuple(line.split(' ', 1)) for line in content]
481
return AnnotatedKnitContent(lines)
483
def parse_line_delta_iter(self, lines):
484
return iter(self.parse_line_delta(lines))
486
def parse_line_delta(self, lines, version_id, plain=False):
487
"""Convert a line based delta into internal representation.
489
line delta is in the form of:
490
intstart intend intcount
492
revid(utf8) newline\n
493
internal representation is
494
(start, end, count, [1..count tuples (revid, newline)])
496
:param plain: If True, the lines are returned as a plain
497
list without annotations, not as a list of (origin, content) tuples, i.e.
498
(start, end, count, [1..count newline])
505
def cache_and_return(line):
506
origin, text = line.split(' ', 1)
507
return cache.setdefault(origin, origin), text
509
# walk through the lines parsing.
510
# Note that the plain test is explicitly pulled out of the
511
# loop to minimise any performance impact
514
start, end, count = [int(n) for n in header.split(',')]
515
contents = [next().split(' ', 1)[1] for i in xrange(count)]
516
result.append((start, end, count, contents))
519
start, end, count = [int(n) for n in header.split(',')]
520
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
521
result.append((start, end, count, contents))
524
def get_fulltext_content(self, lines):
525
"""Extract just the content lines from a fulltext."""
526
return (line.split(' ', 1)[1] for line in lines)
528
def get_linedelta_content(self, lines):
529
"""Extract just the content from a line delta.
531
This doesn't return all of the extra information stored in a delta.
532
Only the actual content lines.
537
header = header.split(',')
538
count = int(header[2])
539
for i in xrange(count):
540
origin, text = next().split(' ', 1)
543
def lower_fulltext(self, content):
544
"""convert a fulltext content record into a serializable form.
546
see parse_fulltext which this inverts.
548
# TODO: jam 20070209 We only do the caching thing to make sure that
549
# the origin is a valid utf-8 line, eventually we could remove it
550
return ['%s %s' % (o, t) for o, t in content._lines]
552
def lower_line_delta(self, delta):
553
"""convert a delta into a serializable form.
555
See parse_line_delta which this inverts.
557
# TODO: jam 20070209 We only do the caching thing to make sure that
558
# the origin is a valid utf-8 line, eventually we could remove it
560
for start, end, c, lines in delta:
561
out.append('%d,%d,%d\n' % (start, end, c))
562
out.extend(origin + ' ' + text
563
for origin, text in lines)
566
def annotate(self, knit, version_id):
567
content = knit._get_content(version_id)
568
return content.annotate()
571
class KnitPlainFactory(_KnitFactory):
572
"""Factory for creating plain Content objects."""
576
def make(self, lines, version_id):
577
return PlainKnitContent(lines, version_id)
579
def parse_fulltext(self, content, version_id):
580
"""This parses an unannotated fulltext.
582
Note that this is not a noop - the internal representation
583
has (versionid, line) - its just a constant versionid.
585
return self.make(content, version_id)
587
def parse_line_delta_iter(self, lines, version_id):
589
num_lines = len(lines)
590
while cur < num_lines:
593
start, end, c = [int(n) for n in header.split(',')]
594
yield start, end, c, lines[cur:cur+c]
597
def parse_line_delta(self, lines, version_id):
598
return list(self.parse_line_delta_iter(lines, version_id))
600
def get_fulltext_content(self, lines):
601
"""Extract just the content lines from a fulltext."""
604
def get_linedelta_content(self, lines):
605
"""Extract just the content from a line delta.
607
This doesn't return all of the extra information stored in a delta.
608
Only the actual content lines.
613
header = header.split(',')
614
count = int(header[2])
615
for i in xrange(count):
618
def lower_fulltext(self, content):
619
return content.text()
621
def lower_line_delta(self, delta):
623
for start, end, c, lines in delta:
624
out.append('%d,%d,%d\n' % (start, end, c))
628
def annotate(self, knit, version_id):
629
annotator = _KnitAnnotator(knit)
630
return annotator.annotate(version_id)
633
def make_empty_knit(transport, relpath):
634
"""Construct a empty knit at the specified location."""
635
k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
638
def make_file_knit(name, transport, file_mode=None, access_mode='w',
639
factory=None, delta=True, create=False, create_parent_dir=False,
640
delay_create=False, dir_mode=None, get_scope=None):
641
"""Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
643
factory = KnitAnnotateFactory()
644
if get_scope is None:
645
get_scope = lambda:None
646
index = _KnitIndex(transport, name + INDEX_SUFFIX,
647
access_mode, create=create, file_mode=file_mode,
648
create_parent_dir=create_parent_dir, delay_create=delay_create,
649
dir_mode=dir_mode, get_scope=get_scope)
650
access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
651
dir_mode, ((create and not len(index)) and delay_create),
653
return KnitVersionedFile(name, transport, factory=factory,
654
create=create, delay_create=delay_create, index=index,
655
access_method=access)
659
"""Return the suffixes used by file based knits."""
660
return [DATA_SUFFIX, INDEX_SUFFIX]
661
make_file_knit.get_suffixes = get_suffixes
664
class KnitVersionedFile(VersionedFile):
665
"""Weave-like structure with faster random access.
667
A knit stores a number of texts and a summary of the relationships
668
between them. Texts are identified by a string version-id. Texts
669
are normally stored and retrieved as a series of lines, but can
670
also be passed as single strings.
672
Lines are stored with the trailing newline (if any) included, to
673
avoid special cases for files with no final newline. Lines are
674
composed of 8-bit characters, not unicode. The combination of
675
these approaches should mean any 'binary' file can be safely
676
stored and retrieved.
679
def __init__(self, relpath, transport, file_mode=None,
680
factory=None, delta=True, create=False, create_parent_dir=False,
681
delay_create=False, dir_mode=None, index=None, access_method=None):
682
"""Construct a knit at location specified by relpath.
684
:param create: If not True, only open an existing knit.
685
:param create_parent_dir: If True, create the parent directory if
686
creating the file fails. (This is used for stores with
687
hash-prefixes that may not exist yet)
688
:param delay_create: The calling code is aware that the knit won't
689
actually be created until the first data is stored.
690
:param index: An index to use for the knit.
692
super(KnitVersionedFile, self).__init__()
693
self.transport = transport
694
self.filename = relpath
695
self.factory = factory or KnitAnnotateFactory()
698
self._max_delta_chain = 200
700
if None in (access_method, index):
701
raise ValueError("No default access_method or index any more")
703
_access = access_method
704
if create and not len(self) and not delay_create:
706
self._data = _KnitData(_access)
709
return '%s(%s)' % (self.__class__.__name__,
710
self.transport.abspath(self.filename))
712
def _check_should_delta(self, first_parents):
713
"""Iterate back through the parent listing, looking for a fulltext.
715
This is used when we want to decide whether to add a delta or a new
716
fulltext. It searches for _max_delta_chain parents. When it finds a
717
fulltext parent, it sees if the total size of the deltas leading up to
718
it is large enough to indicate that we want a new full text anyway.
720
Return True if we should create a new delta, False if we should use a
725
delta_parents = first_parents
726
for count in xrange(self._max_delta_chain):
727
parent = delta_parents[0]
728
method = self._index.get_method(parent)
729
index, pos, size = self._index.get_position(parent)
730
if method == 'fulltext':
734
delta_parents = self._index.get_parent_map([parent])[parent]
736
# We couldn't find a fulltext, so we must create a new one
739
return fulltext_size > delta_size
741
def _check_write_ok(self):
742
return self._index._check_write_ok()
744
def _add_raw_records(self, records, data):
745
"""Add all the records 'records' with data pre-joined in 'data'.
747
:param records: A list of tuples(version_id, options, parents, size).
748
:param data: The data for the records. When it is written, the records
749
are adjusted to have pos pointing into data by the sum of
750
the preceding records sizes.
753
raw_record_sizes = [record[3] for record in records]
754
positions = self._data.add_raw_records(raw_record_sizes, data)
756
for (version_id, options, parents, _), access_memo in zip(
758
index_entries.append((version_id, options, access_memo, parents))
759
self._index.add_versions(index_entries)
761
def copy_to(self, name, transport):
762
"""See VersionedFile.copy_to()."""
763
# copy the current index to a temp index to avoid racing with local
765
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
766
self.transport.get(self._index._filename))
768
f = self._data._open_file()
770
transport.put_file(name + DATA_SUFFIX, f)
773
# move the copied index into place
774
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
776
def get_data_stream(self, required_versions):
777
"""Get a data stream for the specified versions.
779
Versions may be returned in any order, not necessarily the order
780
specified. They are returned in a partial order by compression
781
parent, so that the deltas can be applied as the data stream is
782
inserted; however note that compression parents will not be sent
783
unless they were specifically requested, as the client may already
786
:param required_versions: The exact set of versions to be extracted.
787
Unlike some other knit methods, this is not used to generate a
788
transitive closure, rather it is used precisely as given.
790
:returns: format_signature, list of (version, options, length, parents),
793
required_version_set = frozenset(required_versions)
795
# list of revisions that can just be sent without waiting for their
798
# map from revision to the children based on it
800
# first, read all relevant index data, enough to sort into the right
802
for version_id in required_versions:
803
options = self._index.get_options(version_id)
804
parents = self._index.get_parents_with_ghosts(version_id)
805
index_memo = self._index.get_position(version_id)
806
version_index[version_id] = (index_memo, options, parents)
807
if ('line-delta' in options
808
and parents[0] in required_version_set):
809
# must wait until the parent has been sent
810
deferred.setdefault(parents[0], []). \
813
# either a fulltext, or a delta whose parent the client did
814
# not ask for and presumably already has
815
ready_to_send.append(version_id)
816
# build a list of results to return, plus instructions for data to
818
copy_queue_records = []
819
temp_version_list = []
821
# XXX: pushing and popping lists may be a bit inefficient
822
version_id = ready_to_send.pop(0)
823
(index_memo, options, parents) = version_index[version_id]
824
copy_queue_records.append((version_id, index_memo))
825
none, data_pos, data_size = index_memo
826
temp_version_list.append((version_id, options, data_size,
828
if version_id in deferred:
829
# now we can send all the children of this revision - we could
830
# put them in anywhere, but we hope that sending them soon
831
# after the fulltext will give good locality in the receiver
832
ready_to_send[:0] = deferred.pop(version_id)
833
assert len(deferred) == 0, \
834
"Still have compressed child versions waiting to be sent"
835
# XXX: The stream format is such that we cannot stream it - we have to
836
# know the length of all the data a-priori.
838
result_version_list = []
839
for (version_id, raw_data, _), \
840
(version_id2, options, _, parents) in \
841
izip(self._data.read_records_iter_raw(copy_queue_records),
843
assert version_id == version_id2, \
844
'logic error, inconsistent results'
845
raw_datum.append(raw_data)
846
result_version_list.append(
847
(version_id, options, len(raw_data), parents))
848
# provide a callback to get data incrementally.
849
pseudo_file = StringIO(''.join(raw_datum))
852
return pseudo_file.read()
854
return pseudo_file.read(length)
855
return (self.get_format_signature(), result_version_list, read)
857
def get_record_stream(self, versions, ordering, include_delta_closure):
858
"""Get a stream of records for versions.
860
:param versions: The versions to include. Each version is a tuple
862
:param ordering: Either 'unordered' or 'topological'. A topologically
863
sorted stream has compression parents strictly before their
865
:param include_delta_closure: If True then the closure across any
866
compression parents will be included (in the opaque data).
867
:return: An iterator of ContentFactory objects, each of which is only
868
valid until the iterator is advanced.
870
if include_delta_closure:
871
# Nb: what we should do is plan the data to stream to allow
872
# reconstruction of all the texts without excessive buffering,
873
# including re-sending common bases as needed. This makes the most
874
# sense when we start serialising these streams though, so for now
875
# we just fallback to individual text construction behind the
876
# abstraction barrier.
880
# Double index lookups here : need a unified api ?
881
parent_map = self.get_parent_map(versions)
882
absent_versions = set(versions) - set(parent_map)
883
if ordering == 'topological':
884
present_versions = topo_sort(parent_map)
886
# List comprehension to keep the requested order (as that seems
887
# marginally useful, at least until we start doing IO optimising
889
present_versions = [version for version in versions if version in
891
position_map = self._get_components_positions(present_versions)
892
# c = component_id, r = record_details, i_m = index_memo, n = next
893
records = [(version, position_map[version][1]) for version in
896
for version in absent_versions:
897
yield AbsentContentFactory((version,))
898
for version, raw_data, sha1 in \
899
self._data.read_records_iter_raw(records):
900
(record_details, index_memo, _) = position_map[version]
901
yield KnitContentFactory(version, parent_map[version],
902
record_details, sha1, raw_data, self.factory.annotated, knit)
904
def _extract_blocks(self, version_id, source, target):
905
if self._index.get_method(version_id) != 'line-delta':
907
parent, sha1, noeol, delta = self.get_delta(version_id)
908
return KnitContent.get_line_delta_blocks(delta, source, target)
910
def get_delta(self, version_id):
911
"""Get a delta for constructing version from some other version."""
912
self.check_not_reserved_id(version_id)
913
parents = self.get_parent_map([version_id])[version_id]
918
index_memo = self._index.get_position(version_id)
919
data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
920
noeol = 'no-eol' in self._index.get_options(version_id)
921
if 'fulltext' == self._index.get_method(version_id):
922
new_content = self.factory.parse_fulltext(data, version_id)
923
if parent is not None:
924
reference_content = self._get_content(parent)
925
old_texts = reference_content.text()
928
new_texts = new_content.text()
929
delta_seq = patiencediff.PatienceSequenceMatcher(None, old_texts,
931
return parent, sha1, noeol, self._make_line_delta(delta_seq, new_content)
933
delta = self.factory.parse_line_delta(data, version_id)
934
return parent, sha1, noeol, delta
936
def get_format_signature(self):
937
"""See VersionedFile.get_format_signature()."""
938
if self.factory.annotated:
939
annotated_part = "annotated"
941
annotated_part = "plain"
942
return "knit-%s" % (annotated_part,)
944
def get_sha1s(self, version_ids):
945
"""See VersionedFile.get_sha1s()."""
946
record_map = self._get_record_map(version_ids)
947
# record entry 2 is the 'digest'.
948
return [record_map[v][2] for v in version_ids]
950
def insert_data_stream(self, (format, data_list, reader_callable)):
951
"""Insert knit records from a data stream into this knit.
953
If a version in the stream is already present in this knit, it will not
954
be inserted a second time. It will be checked for consistency with the
955
stored version however, and may cause a KnitCorrupt error to be raised
956
if the data in the stream disagrees with the already stored data.
958
:seealso: get_data_stream
960
if format != self.get_format_signature():
961
if 'knit' in debug.debug_flags:
963
'incompatible format signature inserting to %r', self)
964
source = self._knit_from_datastream(
965
(format, data_list, reader_callable))
966
stream = source.get_record_stream(source.versions(), 'unordered', False)
967
self.insert_record_stream(stream)
970
for version_id, options, length, parents in data_list:
971
if self.has_version(version_id):
972
# First check: the list of parents.
973
my_parents = self.get_parents_with_ghosts(version_id)
974
if tuple(my_parents) != tuple(parents):
975
# XXX: KnitCorrupt is not quite the right exception here.
978
'parents list %r from data stream does not match '
979
'already recorded parents %r for %s'
980
% (parents, my_parents, version_id))
982
# Also check the SHA-1 of the fulltext this content will
984
raw_data = reader_callable(length)
985
my_fulltext_sha1 = self.get_sha1s([version_id])[0]
986
df, rec = self._data._parse_record_header(version_id, raw_data)
987
stream_fulltext_sha1 = rec[3]
988
if my_fulltext_sha1 != stream_fulltext_sha1:
989
# Actually, we don't know if it's this knit that's corrupt,
990
# or the data stream we're trying to insert.
992
self.filename, 'sha-1 does not match %s' % version_id)
994
if 'line-delta' in options:
995
# Make sure that this knit record is actually useful: a
996
# line-delta is no use unless we have its parent.
997
# Fetching from a broken repository with this problem
998
# shouldn't break the target repository.
1000
# See https://bugs.launchpad.net/bzr/+bug/164443
1001
if not self._index.has_version(parents[0]):
1004
'line-delta from stream '
1007
'missing parent %s\n'
1008
'Try running "bzr check" '
1009
'on the source repository, and "bzr reconcile" '
1011
(version_id, parents[0]))
1013
# We received a line-delta record for a non-delta knit.
1014
# Convert it to a fulltext.
1015
gzip_bytes = reader_callable(length)
1016
lines, sha1 = self._data._parse_record(
1017
version_id, gzip_bytes)
1018
delta = self.factory.parse_line_delta(lines,
1020
content = self.factory.make(
1021
self.get_lines(parents[0]), parents[0])
1022
content.apply_delta(delta, version_id)
1023
digest, len, content = self.add_lines(
1024
version_id, parents, content.text())
1026
raise errors.VersionedFileInvalidChecksum(version)
1029
self._add_raw_records(
1030
[(version_id, options, parents, length)],
1031
reader_callable(length))
1033
def _knit_from_datastream(self, (format, data_list, reader_callable)):
1034
"""Create a knit object from a data stream.
1036
This method exists to allow conversion of data streams that do not
1037
match the signature of this knit. Generally it will be slower and use
1038
more memory to use this method to insert data, but it will work.
1040
:seealso: get_data_stream for details on datastreams.
1041
:return: A knit versioned file which can be used to join the datastream
1044
if format == "knit-plain":
1045
factory = KnitPlainFactory()
1046
elif format == "knit-annotated":
1047
factory = KnitAnnotateFactory()
1049
raise errors.KnitDataStreamUnknown(format)
1050
index = _StreamIndex(data_list, self._index)
1051
access = _StreamAccess(reader_callable, index, self, factory)
1052
return KnitVersionedFile(self.filename, self.transport,
1053
factory=factory, index=index, access_method=access)
1055
def insert_record_stream(self, stream):
1056
"""Insert a record stream into this versioned file.
1058
:param stream: A stream of records to insert.
1060
:seealso VersionedFile.get_record_stream:
1062
def get_adapter(adapter_key):
1064
return adapters[adapter_key]
1066
adapter_factory = adapter_registry.get(adapter_key)
1067
adapter = adapter_factory(self)
1068
adapters[adapter_key] = adapter
1070
if self.factory.annotated:
1071
# self is annotated, we need annotated knits to use directly.
1072
annotated = "annotated-"
1075
# self is not annotated, but we can strip annotations cheaply.
1077
convertibles = set(["knit-annotated-delta-gz",
1078
"knit-annotated-ft-gz"])
1079
native_types = set()
1080
native_types.add("knit-%sdelta-gz" % annotated)
1081
native_types.add("knit-%sft-gz" % annotated)
1082
knit_types = native_types.union(convertibles)
1084
# Buffered index entries that we can't add immediately because their
1085
# basis parent is missing. We don't buffer all because generating
1086
# annotations may require access to some of the new records. However we
1087
# can't generate annotations from new deltas until their basis parent
1088
# is present anyway, so we get away with not needing an index that
1089
# reports on the new keys.
1090
# key = basis_parent, value = index entry to add
1091
buffered_index_entries = {}
1092
for record in stream:
1093
# Raise an error when a record is missing.
1094
if record.storage_kind == 'absent':
1095
raise RevisionNotPresent([record.key[0]], self)
1096
# adapt to non-tuple interface
1097
parents = [parent[0] for parent in record.parents]
1098
if record.storage_kind in knit_types:
1099
if record.storage_kind not in native_types:
1101
adapter_key = (record.storage_kind, "knit-delta-gz")
1102
adapter = get_adapter(adapter_key)
1104
adapter_key = (record.storage_kind, "knit-ft-gz")
1105
adapter = get_adapter(adapter_key)
1106
bytes = adapter.get_bytes(
1107
record, record.get_bytes_as(record.storage_kind))
1109
bytes = record.get_bytes_as(record.storage_kind)
1110
options = [record._build_details[0]]
1111
if record._build_details[1]:
1112
options.append('no-eol')
1113
# Just blat it across.
1114
# Note: This does end up adding data on duplicate keys. As
1115
# modern repositories use atomic insertions this should not
1116
# lead to excessive growth in the event of interrupted fetches.
1117
# 'knit' repositories may suffer excessive growth, but as a
1118
# deprecated format this is tolerable. It can be fixed if
1119
# needed by in the kndx index support raising on a duplicate
1120
# add with identical parents and options.
1121
access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
1122
index_entry = (record.key[0], options, access_memo, parents)
1124
if 'fulltext' not in options:
1125
basis_parent = parents[0]
1126
if not self.has_version(basis_parent):
1127
pending = buffered_index_entries.setdefault(
1129
pending.append(index_entry)
1132
self._index.add_versions([index_entry])
1133
elif record.storage_kind == 'fulltext':
1134
self.add_lines(record.key[0], parents,
1135
split_lines(record.get_bytes_as('fulltext')))
1137
adapter_key = record.storage_kind, 'fulltext'
1138
adapter = get_adapter(adapter_key)
1139
lines = split_lines(adapter.get_bytes(
1140
record, record.get_bytes_as(record.storage_kind)))
1142
self.add_lines(record.key[0], parents, lines)
1143
except errors.RevisionAlreadyPresent:
1145
# Add any records whose basis parent is now available.
1146
added_keys = [record.key[0]]
1148
key = added_keys.pop(0)
1149
if key in buffered_index_entries:
1150
index_entries = buffered_index_entries[key]
1151
self._index.add_versions(index_entries)
1153
[index_entry[0] for index_entry in index_entries])
1154
del buffered_index_entries[key]
1155
# If there were any deltas which had a missing basis parent, error.
1156
if buffered_index_entries:
1157
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1161
"""See VersionedFile.versions."""
1162
if 'evil' in debug.debug_flags:
1163
trace.mutter_callsite(2, "versions scales with size of history")
1164
return self._index.get_versions()
1166
def has_version(self, version_id):
1167
"""See VersionedFile.has_version."""
1168
if 'evil' in debug.debug_flags:
1169
trace.mutter_callsite(2, "has_version is a LBYL scenario")
1170
return self._index.has_version(version_id)
1172
__contains__ = has_version
1174
def _merge_annotations(self, content, parents, parent_texts={},
1175
delta=None, annotated=None,
1176
left_matching_blocks=None):
1177
"""Merge annotations for content. This is done by comparing
1178
the annotations based on changed to the text.
1180
if left_matching_blocks is not None:
1181
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1185
for parent_id in parents:
1186
merge_content = self._get_content(parent_id, parent_texts)
1187
if (parent_id == parents[0] and delta_seq is not None):
1190
seq = patiencediff.PatienceSequenceMatcher(
1191
None, merge_content.text(), content.text())
1192
for i, j, n in seq.get_matching_blocks():
1195
# this appears to copy (origin, text) pairs across to the
1196
# new content for any line that matches the last-checked
1198
content._lines[j:j+n] = merge_content._lines[i:i+n]
1200
if delta_seq is None:
1201
reference_content = self._get_content(parents[0], parent_texts)
1202
new_texts = content.text()
1203
old_texts = reference_content.text()
1204
delta_seq = patiencediff.PatienceSequenceMatcher(
1205
None, old_texts, new_texts)
1206
return self._make_line_delta(delta_seq, content)
1208
def _make_line_delta(self, delta_seq, new_content):
1209
"""Generate a line delta from delta_seq and new_content."""
1211
for op in delta_seq.get_opcodes():
1212
if op[0] == 'equal':
1214
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1217
def _get_components_positions(self, version_ids):
1218
"""Produce a map of position data for the components of versions.
1220
This data is intended to be used for retrieving the knit records.
1222
A dict of version_id to (record_details, index_memo, next, parents) is
1224
method is the way referenced data should be applied.
1225
index_memo is the handle to pass to the data access to actually get the
1227
next is the build-parent of the version, or None for fulltexts.
1228
parents is the version_ids of the parents of this version
1231
pending_components = version_ids
1232
while pending_components:
1233
build_details = self._index.get_build_details(pending_components)
1234
current_components = set(pending_components)
1235
pending_components = set()
1236
for version_id, details in build_details.iteritems():
1237
(index_memo, compression_parent, parents,
1238
record_details) = details
1239
method = record_details[0]
1240
if compression_parent is not None:
1241
pending_components.add(compression_parent)
1242
component_data[version_id] = (record_details, index_memo,
1244
missing = current_components.difference(build_details)
1246
raise errors.RevisionNotPresent(missing.pop(), self.filename)
1247
return component_data
1249
def _get_content(self, version_id, parent_texts={}):
1250
"""Returns a content object that makes up the specified
1252
cached_version = parent_texts.get(version_id, None)
1253
if cached_version is not None:
1254
if not self.has_version(version_id):
1255
raise RevisionNotPresent(version_id, self.filename)
1256
return cached_version
1258
text_map, contents_map = self._get_content_maps([version_id])
1259
return contents_map[version_id]
1261
def _check_versions_present(self, version_ids):
1262
"""Check that all specified versions are present."""
1263
self._index.check_versions_present(version_ids)
1265
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts,
1266
nostore_sha, random_id, check_content, left_matching_blocks):
1267
"""See VersionedFile.add_lines_with_ghosts()."""
1268
self._check_add(version_id, lines, random_id, check_content)
1269
return self._add(version_id, lines, parents, self.delta,
1270
parent_texts, left_matching_blocks, nostore_sha, random_id)
1272
def _add_lines(self, version_id, parents, lines, parent_texts,
1273
left_matching_blocks, nostore_sha, random_id, check_content):
1274
"""See VersionedFile.add_lines."""
1275
self._check_add(version_id, lines, random_id, check_content)
1276
self._check_versions_present(parents)
1277
return self._add(version_id, lines[:], parents, self.delta,
1278
parent_texts, left_matching_blocks, nostore_sha, random_id)
1280
def _check_add(self, version_id, lines, random_id, check_content):
1281
"""check that version_id and lines are safe to add."""
1282
if contains_whitespace(version_id):
1283
raise InvalidRevisionId(version_id, self.filename)
1284
self.check_not_reserved_id(version_id)
1285
# Technically this could be avoided if we are happy to allow duplicate
1286
# id insertion when other things than bzr core insert texts, but it
1287
# seems useful for folk using the knit api directly to have some safety
1288
# blanket that we can disable.
1289
if not random_id and self.has_version(version_id):
1290
raise RevisionAlreadyPresent(version_id, self.filename)
1292
self._check_lines_not_unicode(lines)
1293
self._check_lines_are_lines(lines)
1295
def _add(self, version_id, lines, parents, delta, parent_texts,
1296
left_matching_blocks, nostore_sha, random_id):
1297
"""Add a set of lines on top of version specified by parents.
1299
If delta is true, compress the text as a line-delta against
1302
Any versions not present will be converted into ghosts.
1304
# first thing, if the content is something we don't need to store, find
1306
line_bytes = ''.join(lines)
1307
digest = sha_string(line_bytes)
1308
if nostore_sha == digest:
1309
raise errors.ExistingContent
1311
present_parents = []
1312
if parent_texts is None:
1314
for parent in parents:
1315
if self.has_version(parent):
1316
present_parents.append(parent)
1318
# can only compress against the left most present parent.
1320
(len(present_parents) == 0 or
1321
present_parents[0] != parents[0])):
1324
text_length = len(line_bytes)
1327
if lines[-1][-1] != '\n':
1328
# copy the contents of lines.
1330
options.append('no-eol')
1331
lines[-1] = lines[-1] + '\n'
1335
# To speed the extract of texts the delta chain is limited
1336
# to a fixed number of deltas. This should minimize both
1337
# I/O and the time spend applying deltas.
1338
delta = self._check_should_delta(present_parents)
1340
assert isinstance(version_id, str)
1341
content = self.factory.make(lines, version_id)
1342
if delta or (self.factory.annotated and len(present_parents) > 0):
1343
# Merge annotations from parent texts if needed.
1344
delta_hunks = self._merge_annotations(content, present_parents,
1345
parent_texts, delta, self.factory.annotated,
1346
left_matching_blocks)
1349
options.append('line-delta')
1350
store_lines = self.factory.lower_line_delta(delta_hunks)
1351
size, bytes = self._data._record_to_data(version_id, digest,
1354
options.append('fulltext')
1355
# isinstance is slower and we have no hierarchy.
1356
if self.factory.__class__ == KnitPlainFactory:
1357
# Use the already joined bytes saving iteration time in
1359
size, bytes = self._data._record_to_data(version_id, digest,
1360
lines, [line_bytes])
1362
# get mixed annotation + content and feed it into the
1364
store_lines = self.factory.lower_fulltext(content)
1365
size, bytes = self._data._record_to_data(version_id, digest,
1368
access_memo = self._data.add_raw_records([size], bytes)[0]
1369
self._index.add_versions(
1370
((version_id, options, access_memo, parents),),
1371
random_id=random_id)
1372
return digest, text_length, content
1374
def check(self, progress_bar=None):
1375
"""See VersionedFile.check()."""
1376
# This doesn't actually test extraction of everything, but that will
1377
# impact 'bzr check' substantially, and needs to be integrated with
1378
# care. However, it does check for the obvious problem of a delta with
1380
versions = self.versions()
1381
parent_map = self.get_parent_map(versions)
1382
for version in versions:
1383
if self._index.get_method(version) != 'fulltext':
1384
compression_parent = parent_map[version][0]
1385
if compression_parent not in parent_map:
1386
raise errors.KnitCorrupt(self,
1387
"Missing basis parent %s for %s" % (
1388
compression_parent, version))
1390
def get_lines(self, version_id):
1391
"""See VersionedFile.get_lines()."""
1392
return self.get_line_list([version_id])[0]
1394
def _get_record_map(self, version_ids):
1395
"""Produce a dictionary of knit records.
1397
:return: {version_id:(record, record_details, digest, next)}
1399
data returned from read_records
1401
opaque information to pass to parse_record
1403
SHA1 digest of the full text after all steps are done
1405
build-parent of the version, i.e. the leftmost ancestor.
1406
Will be None if the record is not a delta.
1408
position_map = self._get_components_positions(version_ids)
1409
# c = component_id, r = record_details, i_m = index_memo, n = next
1410
records = [(c, i_m) for c, (r, i_m, n)
1411
in position_map.iteritems()]
1413
for component_id, record, digest in \
1414
self._data.read_records_iter(records):
1415
(record_details, index_memo, next) = position_map[component_id]
1416
record_map[component_id] = record, record_details, digest, next
1420
def get_text(self, version_id):
1421
"""See VersionedFile.get_text"""
1422
return self.get_texts([version_id])[0]
1424
def get_texts(self, version_ids):
1425
return [''.join(l) for l in self.get_line_list(version_ids)]
1427
def get_line_list(self, version_ids):
1428
"""Return the texts of listed versions as a list of strings."""
1429
for version_id in version_ids:
1430
self.check_not_reserved_id(version_id)
1431
text_map, content_map = self._get_content_maps(version_ids)
1432
return [text_map[v] for v in version_ids]
1434
_get_lf_split_line_list = get_line_list
1436
def _get_content_maps(self, version_ids):
1437
"""Produce maps of text and KnitContents
1439
:return: (text_map, content_map) where text_map contains the texts for
1440
the requested versions and content_map contains the KnitContents.
1441
Both dicts take version_ids as their keys.
1443
# FUTURE: This function could be improved for the 'extract many' case
1444
# by tracking each component and only doing the copy when the number of
1445
# children than need to apply delta's to it is > 1 or it is part of the
1447
version_ids = list(version_ids)
1448
multiple_versions = len(version_ids) != 1
1449
record_map = self._get_record_map(version_ids)
1454
for version_id in version_ids:
1457
while cursor is not None:
1458
record, record_details, digest, next = record_map[cursor]
1459
components.append((cursor, record, record_details, digest))
1460
if cursor in content_map:
1465
for (component_id, record, record_details,
1466
digest) in reversed(components):
1467
if component_id in content_map:
1468
content = content_map[component_id]
1470
content, delta = self.factory.parse_record(version_id,
1471
record, record_details, content,
1472
copy_base_content=multiple_versions)
1473
if multiple_versions:
1474
content_map[component_id] = content
1476
content.cleanup_eol(copy_on_mutate=multiple_versions)
1477
final_content[version_id] = content
1479
# digest here is the digest from the last applied component.
1480
text = content.text()
1481
actual_sha = sha_strings(text)
1482
if actual_sha != digest:
1483
raise KnitCorrupt(self.filename,
1485
'\n of reconstructed text does not match'
1487
'\n for version %s' %
1488
(actual_sha, digest, version_id))
1489
text_map[version_id] = text
1490
return text_map, final_content
1492
def iter_lines_added_or_present_in_versions(self, version_ids=None,
1494
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
1495
if version_ids is None:
1496
version_ids = self.versions()
1498
pb = progress.DummyProgress()
1499
# we don't care about inclusions, the caller cares.
1500
# but we need to setup a list of records to visit.
1501
# we need version_id, position, length
1502
version_id_records = []
1503
requested_versions = set(version_ids)
1504
# filter for available versions
1505
for version_id in requested_versions:
1506
if not self.has_version(version_id):
1507
raise RevisionNotPresent(version_id, self.filename)
1508
# get a in-component-order queue:
1509
for version_id in self.versions():
1510
if version_id in requested_versions:
1511
index_memo = self._index.get_position(version_id)
1512
version_id_records.append((version_id, index_memo))
1514
total = len(version_id_records)
1515
for version_idx, (version_id, data, sha_value) in \
1516
enumerate(self._data.read_records_iter(version_id_records)):
1517
pb.update('Walking content.', version_idx, total)
1518
method = self._index.get_method(version_id)
1520
assert method in ('fulltext', 'line-delta')
1521
if method == 'fulltext':
1522
line_iterator = self.factory.get_fulltext_content(data)
1524
line_iterator = self.factory.get_linedelta_content(data)
1525
# XXX: It might be more efficient to yield (version_id,
1526
# line_iterator) in the future. However for now, this is a simpler
1527
# change to integrate into the rest of the codebase. RBC 20071110
1528
for line in line_iterator:
1529
yield line, version_id
1531
pb.update('Walking content.', total, total)
1533
def num_versions(self):
1534
"""See VersionedFile.num_versions()."""
1535
return self._index.num_versions()
1537
__len__ = num_versions
1539
def annotate(self, version_id):
1540
"""See VersionedFile.annotate."""
1541
return self.factory.annotate(self, version_id)
1543
def get_parent_map(self, version_ids):
1544
"""See VersionedFile.get_parent_map."""
1545
return self._index.get_parent_map(version_ids)
1547
def get_ancestry(self, versions, topo_sorted=True):
1548
"""See VersionedFile.get_ancestry."""
1549
if isinstance(versions, basestring):
1550
versions = [versions]
1553
return self._index.get_ancestry(versions, topo_sorted)
1555
def get_ancestry_with_ghosts(self, versions):
1556
"""See VersionedFile.get_ancestry_with_ghosts."""
1557
if isinstance(versions, basestring):
1558
versions = [versions]
1561
return self._index.get_ancestry_with_ghosts(versions)
1563
def plan_merge(self, ver_a, ver_b):
1564
"""See VersionedFile.plan_merge."""
1565
ancestors_b = set(self.get_ancestry(ver_b, topo_sorted=False))
1566
ancestors_a = set(self.get_ancestry(ver_a, topo_sorted=False))
1567
annotated_a = self.annotate(ver_a)
1568
annotated_b = self.annotate(ver_b)
1569
return merge._plan_annotate_merge(annotated_a, annotated_b,
1570
ancestors_a, ancestors_b)
1573
class _KnitComponentFile(object):
1574
"""One of the files used to implement a knit database"""
1576
def __init__(self, transport, filename, mode, file_mode=None,
1577
create_parent_dir=False, dir_mode=None):
1578
self._transport = transport
1579
self._filename = filename
1581
self._file_mode = file_mode
1582
self._dir_mode = dir_mode
1583
self._create_parent_dir = create_parent_dir
1584
self._need_to_create = False
1586
def _full_path(self):
1587
"""Return the full path to this file."""
1588
return self._transport.base + self._filename
1590
def check_header(self, fp):
1591
line = fp.readline()
1593
# An empty file can actually be treated as though the file doesn't
1595
raise errors.NoSuchFile(self._full_path())
1596
if line != self.HEADER:
1597
raise KnitHeaderError(badline=line,
1598
filename=self._transport.abspath(self._filename))
1601
return '%s(%s)' % (self.__class__.__name__, self._filename)
1604
class _KnitIndex(_KnitComponentFile):
1605
"""Manages knit index file.
1607
The index is already kept in memory and read on startup, to enable
1608
fast lookups of revision information. The cursor of the index
1609
file is always pointing to the end, making it easy to append
1612
_cache is a cache for fast mapping from version id to a Index
1615
_history is a cache for fast mapping from indexes to version ids.
1617
The index data format is dictionary compressed when it comes to
1618
parent references; a index entry may only have parents that with a
1619
lover index number. As a result, the index is topological sorted.
1621
Duplicate entries may be written to the index for a single version id
1622
if this is done then the latter one completely replaces the former:
1623
this allows updates to correct version and parent information.
1624
Note that the two entries may share the delta, and that successive
1625
annotations and references MUST point to the first entry.
1627
The index file on disc contains a header, followed by one line per knit
1628
record. The same revision can be present in an index file more than once.
1629
The first occurrence gets assigned a sequence number starting from 0.
1631
The format of a single line is
1632
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1633
REVISION_ID is a utf8-encoded revision id
1634
FLAGS is a comma separated list of flags about the record. Values include
1635
no-eol, line-delta, fulltext.
1636
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1637
that the the compressed data starts at.
1638
LENGTH is the ascii representation of the length of the data file.
1639
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1641
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1642
revision id already in the knit that is a parent of REVISION_ID.
1643
The ' :' marker is the end of record marker.
1646
when a write is interrupted to the index file, it will result in a line
1647
that does not end in ' :'. If the ' :' is not present at the end of a line,
1648
or at the end of the file, then the record that is missing it will be
1649
ignored by the parser.
1651
When writing new records to the index file, the data is preceded by '\n'
1652
to ensure that records always start on new lines even if the last write was
1653
interrupted. As a result its normal for the last line in the index to be
1654
missing a trailing newline. One can be added with no harmful effects.
1657
HEADER = "# bzr knit index 8\n"
1659
# speed of knit parsing went from 280 ms to 280 ms with slots addition.
1660
# __slots__ = ['_cache', '_history', '_transport', '_filename']
1662
def _cache_version(self, version_id, options, pos, size, parents):
1663
"""Cache a version record in the history array and index cache.
1665
This is inlined into _load_data for performance. KEEP IN SYNC.
1666
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1669
# only want the _history index to reference the 1st index entry
1671
if version_id not in self._cache:
1672
index = len(self._history)
1673
self._history.append(version_id)
1675
index = self._cache[version_id][5]
1676
self._cache[version_id] = (version_id,
1683
def _check_write_ok(self):
1684
if self._get_scope() != self._scope:
1685
raise errors.OutSideTransaction()
1686
if self._mode != 'w':
1687
raise errors.ReadOnlyObjectDirtiedError(self)
1689
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1690
create_parent_dir=False, delay_create=False, dir_mode=None,
1692
_KnitComponentFile.__init__(self, transport, filename, mode,
1693
file_mode=file_mode,
1694
create_parent_dir=create_parent_dir,
1697
# position in _history is the 'official' index for a revision
1698
# but the values may have come from a newer entry.
1699
# so - wc -l of a knit index is != the number of unique names
1703
fp = self._transport.get(self._filename)
1705
# _load_data may raise NoSuchFile if the target knit is
1707
_load_data(self, fp)
1711
if mode != 'w' or not create:
1714
self._need_to_create = True
1716
self._transport.put_bytes_non_atomic(
1717
self._filename, self.HEADER, mode=self._file_mode)
1718
self._scope = get_scope()
1719
self._get_scope = get_scope
1721
def get_ancestry(self, versions, topo_sorted=True):
1722
"""See VersionedFile.get_ancestry."""
1723
# get a graph of all the mentioned versions:
1725
pending = set(versions)
1728
version = pending.pop()
1731
parents = [p for p in cache[version][4] if p in cache]
1733
raise RevisionNotPresent(version, self._filename)
1734
# if not completed and not a ghost
1735
pending.update([p for p in parents if p not in graph])
1736
graph[version] = parents
1739
return topo_sort(graph.items())
1741
def get_ancestry_with_ghosts(self, versions):
1742
"""See VersionedFile.get_ancestry_with_ghosts."""
1743
# get a graph of all the mentioned versions:
1744
self.check_versions_present(versions)
1747
pending = set(versions)
1749
version = pending.pop()
1751
parents = cache[version][4]
1757
pending.update([p for p in parents if p not in graph])
1758
graph[version] = parents
1759
return topo_sort(graph.items())
1761
def get_build_details(self, version_ids):
1762
"""Get the method, index_memo and compression parent for version_ids.
1764
Ghosts are omitted from the result.
1766
:param version_ids: An iterable of version_ids.
1767
:return: A dict of version_id:(index_memo, compression_parent,
1768
parents, record_details).
1770
opaque structure to pass to read_records to extract the raw
1773
Content that this record is built upon, may be None
1775
Logical parents of this node
1777
extra information about the content which needs to be passed to
1778
Factory.parse_record
1781
for version_id in version_ids:
1782
if version_id not in self._cache:
1783
# ghosts are omitted
1785
method = self.get_method(version_id)
1786
parents = self.get_parents_with_ghosts(version_id)
1787
if method == 'fulltext':
1788
compression_parent = None
1790
compression_parent = parents[0]
1791
noeol = 'no-eol' in self.get_options(version_id)
1792
index_memo = self.get_position(version_id)
1793
result[version_id] = (index_memo, compression_parent,
1794
parents, (method, noeol))
1797
def num_versions(self):
1798
return len(self._history)
1800
__len__ = num_versions
1802
def get_versions(self):
1803
"""Get all the versions in the file. not topologically sorted."""
1804
return self._history
1806
def _version_list_to_index(self, versions):
1809
for version in versions:
1810
if version in cache:
1811
# -- inlined lookup() --
1812
result_list.append(str(cache[version][5]))
1813
# -- end lookup () --
1815
result_list.append('.' + version)
1816
return ' '.join(result_list)
1818
def add_version(self, version_id, options, index_memo, parents):
1819
"""Add a version record to the index."""
1820
self.add_versions(((version_id, options, index_memo, parents),))
1822
def add_versions(self, versions, random_id=False):
1823
"""Add multiple versions to the index.
1825
:param versions: a list of tuples:
1826
(version_id, options, pos, size, parents).
1827
:param random_id: If True the ids being added were randomly generated
1828
and no check for existence will be performed.
1831
orig_history = self._history[:]
1832
orig_cache = self._cache.copy()
1835
for version_id, options, (index, pos, size), parents in versions:
1836
line = "\n%s %s %s %s %s :" % (version_id,
1840
self._version_list_to_index(parents))
1841
assert isinstance(line, str), \
1842
'content must be utf-8 encoded: %r' % (line,)
1844
self._cache_version(version_id, options, pos, size, tuple(parents))
1845
if not self._need_to_create:
1846
self._transport.append_bytes(self._filename, ''.join(lines))
1849
sio.write(self.HEADER)
1850
sio.writelines(lines)
1852
self._transport.put_file_non_atomic(self._filename, sio,
1853
create_parent_dir=self._create_parent_dir,
1854
mode=self._file_mode,
1855
dir_mode=self._dir_mode)
1856
self._need_to_create = False
1858
# If any problems happen, restore the original values and re-raise
1859
self._history = orig_history
1860
self._cache = orig_cache
1863
def has_version(self, version_id):
1864
"""True if the version is in the index."""
1865
return version_id in self._cache
1867
def get_position(self, version_id):
1868
"""Return details needed to access the version.
1870
.kndx indices do not support split-out data, so return None for the
1873
:return: a tuple (None, data position, size) to hand to the access
1874
logic to get the record.
1876
entry = self._cache[version_id]
1877
return None, entry[2], entry[3]
1879
def get_method(self, version_id):
1880
"""Return compression method of specified version."""
1882
options = self._cache[version_id][1]
1884
raise RevisionNotPresent(version_id, self._filename)
1885
if 'fulltext' in options:
1888
if 'line-delta' not in options:
1889
raise errors.KnitIndexUnknownMethod(self._full_path(), options)
1892
def get_options(self, version_id):
1893
"""Return a list representing options.
1897
return self._cache[version_id][1]
1899
def get_parent_map(self, version_ids):
1900
"""Passed through to by KnitVersionedFile.get_parent_map."""
1902
for version_id in version_ids:
1904
result[version_id] = tuple(self._cache[version_id][4])
1909
def get_parents_with_ghosts(self, version_id):
1910
"""Return parents of specified version with ghosts."""
1912
return self.get_parent_map([version_id])[version_id]
1914
raise RevisionNotPresent(version_id, self)
1916
def check_versions_present(self, version_ids):
1917
"""Check that all specified versions are present."""
1919
for version_id in version_ids:
1920
if version_id not in cache:
1921
raise RevisionNotPresent(version_id, self._filename)
1924
class KnitGraphIndex(object):
1925
"""A knit index that builds on GraphIndex."""
1927
def __init__(self, graph_index, deltas=False, parents=True, add_callback=None):
1928
"""Construct a KnitGraphIndex on a graph_index.
1930
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1931
:param deltas: Allow delta-compressed records.
1932
:param add_callback: If not None, allow additions to the index and call
1933
this callback with a list of added GraphIndex nodes:
1934
[(node, value, node_refs), ...]
1935
:param parents: If True, record knits parents, if not do not record
1938
self._graph_index = graph_index
1939
self._deltas = deltas
1940
self._add_callback = add_callback
1941
self._parents = parents
1942
if deltas and not parents:
1943
raise KnitCorrupt(self, "Cannot do delta compression without "
1946
def _check_write_ok(self):
1949
def _get_entries(self, keys, check_present=False):
1950
"""Get the entries for keys.
1952
:param keys: An iterable of index keys, - 1-tuples.
1957
for node in self._graph_index.iter_entries(keys):
1959
found_keys.add(node[1])
1961
# adapt parentless index to the rest of the code.
1962
for node in self._graph_index.iter_entries(keys):
1963
yield node[0], node[1], node[2], ()
1964
found_keys.add(node[1])
1966
missing_keys = keys.difference(found_keys)
1968
raise RevisionNotPresent(missing_keys.pop(), self)
1970
def _present_keys(self, version_ids):
1972
node[1] for node in self._get_entries(version_ids)])
1974
def _parentless_ancestry(self, versions):
1975
"""Honour the get_ancestry API for parentless knit indices."""
1976
wanted_keys = self._version_ids_to_keys(versions)
1977
present_keys = self._present_keys(wanted_keys)
1978
missing = set(wanted_keys).difference(present_keys)
1980
raise RevisionNotPresent(missing.pop(), self)
1981
return list(self._keys_to_version_ids(present_keys))
1983
def get_ancestry(self, versions, topo_sorted=True):
1984
"""See VersionedFile.get_ancestry."""
1985
if not self._parents:
1986
return self._parentless_ancestry(versions)
1987
# XXX: This will do len(history) index calls - perhaps
1988
# it should be altered to be a index core feature?
1989
# get a graph of all the mentioned versions:
1992
versions = self._version_ids_to_keys(versions)
1993
pending = set(versions)
1995
# get all pending nodes
1996
this_iteration = pending
1997
new_nodes = self._get_entries(this_iteration)
2000
for (index, key, value, node_refs) in new_nodes:
2001
# dont ask for ghosties - otherwise
2002
# we we can end up looping with pending
2003
# being entirely ghosted.
2004
graph[key] = [parent for parent in node_refs[0]
2005
if parent not in ghosts]
2007
for parent in graph[key]:
2008
# dont examine known nodes again
2013
ghosts.update(this_iteration.difference(found))
2014
if versions.difference(graph):
2015
raise RevisionNotPresent(versions.difference(graph).pop(), self)
2017
result_keys = topo_sort(graph.items())
2019
result_keys = graph.iterkeys()
2020
return [key[0] for key in result_keys]
2022
def get_ancestry_with_ghosts(self, versions):
2023
"""See VersionedFile.get_ancestry."""
2024
if not self._parents:
2025
return self._parentless_ancestry(versions)
2026
# XXX: This will do len(history) index calls - perhaps
2027
# it should be altered to be a index core feature?
2028
# get a graph of all the mentioned versions:
2030
versions = self._version_ids_to_keys(versions)
2031
pending = set(versions)
2033
# get all pending nodes
2034
this_iteration = pending
2035
new_nodes = self._get_entries(this_iteration)
2037
for (index, key, value, node_refs) in new_nodes:
2038
graph[key] = node_refs[0]
2040
for parent in graph[key]:
2041
# dont examine known nodes again
2045
missing_versions = this_iteration.difference(graph)
2046
missing_needed = versions.intersection(missing_versions)
2048
raise RevisionNotPresent(missing_needed.pop(), self)
2049
for missing_version in missing_versions:
2050
# add a key, no parents
2051
graph[missing_version] = []
2052
pending.discard(missing_version) # don't look for it
2053
result_keys = topo_sort(graph.items())
2054
return [key[0] for key in result_keys]
2056
def get_build_details(self, version_ids):
2057
"""Get the method, index_memo and compression parent for version_ids.
2059
Ghosts are omitted from the result.
2061
:param version_ids: An iterable of version_ids.
2062
:return: A dict of version_id:(index_memo, compression_parent,
2063
parents, record_details).
2065
opaque structure to pass to read_records to extract the raw
2068
Content that this record is built upon, may be None
2070
Logical parents of this node
2072
extra information about the content which needs to be passed to
2073
Factory.parse_record
2076
entries = self._get_entries(self._version_ids_to_keys(version_ids), True)
2077
for entry in entries:
2078
version_id = self._keys_to_version_ids((entry[1],))[0]
2079
if not self._parents:
2082
parents = self._keys_to_version_ids(entry[3][0])
2083
if not self._deltas:
2084
compression_parent = None
2086
compression_parent_key = self._compression_parent(entry)
2087
if compression_parent_key:
2088
compression_parent = self._keys_to_version_ids(
2089
(compression_parent_key,))[0]
2091
compression_parent = None
2092
noeol = (entry[2][0] == 'N')
2093
if compression_parent:
2094
method = 'line-delta'
2097
result[version_id] = (self._node_to_position(entry),
2098
compression_parent, parents,
2102
def _compression_parent(self, an_entry):
2103
# return the key that an_entry is compressed against, or None
2104
# Grab the second parent list (as deltas implies parents currently)
2105
compression_parents = an_entry[3][1]
2106
if not compression_parents:
2108
assert len(compression_parents) == 1
2109
return compression_parents[0]
2111
def _get_method(self, node):
2112
if not self._deltas:
2114
if self._compression_parent(node):
2119
def num_versions(self):
2120
return len(list(self._graph_index.iter_all_entries()))
2122
__len__ = num_versions
2124
def get_versions(self):
2125
"""Get all the versions in the file. not topologically sorted."""
2126
return [node[1][0] for node in self._graph_index.iter_all_entries()]
2128
def has_version(self, version_id):
2129
"""True if the version is in the index."""
2130
return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
2132
def _keys_to_version_ids(self, keys):
2133
return tuple(key[0] for key in keys)
2135
def get_position(self, version_id):
2136
"""Return details needed to access the version.
2138
:return: a tuple (index, data position, size) to hand to the access
2139
logic to get the record.
2141
node = self._get_node(version_id)
2142
return self._node_to_position(node)
2144
def _node_to_position(self, node):
2145
"""Convert an index value to position details."""
2146
bits = node[2][1:].split(' ')
2147
return node[0], int(bits[0]), int(bits[1])
2149
def get_method(self, version_id):
2150
"""Return compression method of specified version."""
2151
return self._get_method(self._get_node(version_id))
2153
def _get_node(self, version_id):
2155
return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
2157
raise RevisionNotPresent(version_id, self)
2159
def get_options(self, version_id):
2160
"""Return a list representing options.
2164
node = self._get_node(version_id)
2165
options = [self._get_method(node)]
2166
if node[2][0] == 'N':
2167
options.append('no-eol')
2170
def get_parent_map(self, version_ids):
2171
"""Passed through to by KnitVersionedFile.get_parent_map."""
2172
nodes = self._get_entries(self._version_ids_to_keys(version_ids))
2176
result[node[1][0]] = self._keys_to_version_ids(node[3][0])
2179
result[node[1][0]] = ()
2182
def get_parents_with_ghosts(self, version_id):
2183
"""Return parents of specified version with ghosts."""
2185
return self.get_parent_map([version_id])[version_id]
2187
raise RevisionNotPresent(version_id, self)
2189
def check_versions_present(self, version_ids):
2190
"""Check that all specified versions are present."""
2191
keys = self._version_ids_to_keys(version_ids)
2192
present = self._present_keys(keys)
2193
missing = keys.difference(present)
2195
raise RevisionNotPresent(missing.pop(), self)
2197
def add_version(self, version_id, options, access_memo, parents):
2198
"""Add a version record to the index."""
2199
return self.add_versions(((version_id, options, access_memo, parents),))
2201
def add_versions(self, versions, random_id=False):
2202
"""Add multiple versions to the index.
2204
This function does not insert data into the Immutable GraphIndex
2205
backing the KnitGraphIndex, instead it prepares data for insertion by
2206
the caller and checks that it is safe to insert then calls
2207
self._add_callback with the prepared GraphIndex nodes.
2209
:param versions: a list of tuples:
2210
(version_id, options, pos, size, parents).
2211
:param random_id: If True the ids being added were randomly generated
2212
and no check for existence will be performed.
2214
if not self._add_callback:
2215
raise errors.ReadOnlyError(self)
2216
# we hope there are no repositories with inconsistent parentage
2221
for (version_id, options, access_memo, parents) in versions:
2222
index, pos, size = access_memo
2223
key = (version_id, )
2224
parents = tuple((parent, ) for parent in parents)
2225
if 'no-eol' in options:
2229
value += "%d %d" % (pos, size)
2230
if not self._deltas:
2231
if 'line-delta' in options:
2232
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2235
if 'line-delta' in options:
2236
node_refs = (parents, (parents[0],))
2238
node_refs = (parents, ())
2240
node_refs = (parents, )
2243
raise KnitCorrupt(self, "attempt to add node with parents "
2244
"in parentless index.")
2246
keys[key] = (value, node_refs)
2248
present_nodes = self._get_entries(keys)
2249
for (index, key, value, node_refs) in present_nodes:
2250
if (value, node_refs) != keys[key]:
2251
raise KnitCorrupt(self, "inconsistent details in add_versions"
2252
": %s %s" % ((value, node_refs), keys[key]))
2256
for key, (value, node_refs) in keys.iteritems():
2257
result.append((key, value, node_refs))
2259
for key, (value, node_refs) in keys.iteritems():
2260
result.append((key, value))
2261
self._add_callback(result)
2263
def _version_ids_to_keys(self, version_ids):
2264
return set((version_id, ) for version_id in version_ids)
2267
class _KnitAccess(object):
2268
"""Access to knit records in a .knit file."""
2270
def __init__(self, transport, filename, _file_mode, _dir_mode,
2271
_need_to_create, _create_parent_dir):
2272
"""Create a _KnitAccess for accessing and inserting data.
2274
:param transport: The transport the .knit is located on.
2275
:param filename: The filename of the .knit.
2277
self._transport = transport
2278
self._filename = filename
2279
self._file_mode = _file_mode
2280
self._dir_mode = _dir_mode
2281
self._need_to_create = _need_to_create
2282
self._create_parent_dir = _create_parent_dir
2284
def add_raw_records(self, sizes, raw_data):
2285
"""Add raw knit bytes to a storage area.
2287
The data is spooled to whereever the access method is storing data.
2289
:param sizes: An iterable containing the size of each raw data segment.
2290
:param raw_data: A bytestring containing the data.
2291
:return: A list of memos to retrieve the record later. Each memo is a
2292
tuple - (index, pos, length), where the index field is always None
2293
for the .knit access method.
2295
assert type(raw_data) == str, \
2296
'data must be plain bytes was %s' % type(raw_data)
2297
if not self._need_to_create:
2298
base = self._transport.append_bytes(self._filename, raw_data)
2300
self._transport.put_bytes_non_atomic(self._filename, raw_data,
2301
create_parent_dir=self._create_parent_dir,
2302
mode=self._file_mode,
2303
dir_mode=self._dir_mode)
2304
self._need_to_create = False
2308
result.append((None, base, size))
2313
"""IFF this data access has its own storage area, initialise it.
2317
self._transport.put_bytes_non_atomic(self._filename, '',
2318
mode=self._file_mode)
2320
def open_file(self):
2321
"""IFF this data access can be represented as a single file, open it.
2323
For knits that are not mapped to a single file on disk this will
2326
:return: None or a file handle.
2329
return self._transport.get(self._filename)
2334
def get_raw_records(self, memos_for_retrieval):
2335
"""Get the raw bytes for a records.
2337
:param memos_for_retrieval: An iterable containing the (index, pos,
2338
length) memo for retrieving the bytes. The .knit method ignores
2339
the index as there is always only a single file.
2340
:return: An iterator over the bytes of the records.
2342
read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
2343
for pos, data in self._transport.readv(self._filename, read_vector):
2347
class _PackAccess(object):
2348
"""Access to knit records via a collection of packs."""
2350
def __init__(self, index_to_packs, writer=None):
2351
"""Create a _PackAccess object.
2353
:param index_to_packs: A dict mapping index objects to the transport
2354
and file names for obtaining data.
2355
:param writer: A tuple (pack.ContainerWriter, write_index) which
2356
contains the pack to write, and the index that reads from it will
2360
self.container_writer = writer[0]
2361
self.write_index = writer[1]
2363
self.container_writer = None
2364
self.write_index = None
2365
self.indices = index_to_packs
2367
def add_raw_records(self, sizes, raw_data):
2368
"""Add raw knit bytes to a storage area.
2370
The data is spooled to the container writer in one bytes-record per
2373
:param sizes: An iterable containing the size of each raw data segment.
2374
:param raw_data: A bytestring containing the data.
2375
:return: A list of memos to retrieve the record later. Each memo is a
2376
tuple - (index, pos, length), where the index field is the
2377
write_index object supplied to the PackAccess object.
2379
assert type(raw_data) == str, \
2380
'data must be plain bytes was %s' % type(raw_data)
2384
p_offset, p_length = self.container_writer.add_bytes_record(
2385
raw_data[offset:offset+size], [])
2387
result.append((self.write_index, p_offset, p_length))
2391
"""Pack based knits do not get individually created."""
2393
def get_raw_records(self, memos_for_retrieval):
2394
"""Get the raw bytes for a records.
2396
:param memos_for_retrieval: An iterable containing the (index, pos,
2397
length) memo for retrieving the bytes. The Pack access method
2398
looks up the pack to use for a given record in its index_to_pack
2400
:return: An iterator over the bytes of the records.
2402
# first pass, group into same-index requests
2404
current_index = None
2405
for (index, offset, length) in memos_for_retrieval:
2406
if current_index == index:
2407
current_list.append((offset, length))
2409
if current_index is not None:
2410
request_lists.append((current_index, current_list))
2411
current_index = index
2412
current_list = [(offset, length)]
2413
# handle the last entry
2414
if current_index is not None:
2415
request_lists.append((current_index, current_list))
2416
for index, offsets in request_lists:
2417
transport, path = self.indices[index]
2418
reader = pack.make_readv_reader(transport, path, offsets)
2419
for names, read_func in reader.iter_records():
2420
yield read_func(None)
2422
def open_file(self):
2423
"""Pack based knits have no single file."""
2426
def set_writer(self, writer, index, (transport, packname)):
2427
"""Set a writer to use for adding data."""
2428
if index is not None:
2429
self.indices[index] = (transport, packname)
2430
self.container_writer = writer
2431
self.write_index = index
2434
class _StreamAccess(object):
2435
"""A Knit Access object that provides data from a datastream.
2437
It also provides a fallback to present as unannotated data, annotated data
2438
from a *backing* access object.
2440
This is triggered by a index_memo which is pointing to a different index
2441
than this was constructed with, and is used to allow extracting full
2442
unannotated texts for insertion into annotated knits.
2445
def __init__(self, reader_callable, stream_index, backing_knit,
2447
"""Create a _StreamAccess object.
2449
:param reader_callable: The reader_callable from the datastream.
2450
This is called to buffer all the data immediately, for
2452
:param stream_index: The index the data stream this provides access to
2453
which will be present in native index_memo's.
2454
:param backing_knit: The knit object that will provide access to
2455
annotated texts which are not available in the stream, so as to
2456
create unannotated texts.
2457
:param orig_factory: The original content factory used to generate the
2458
stream. This is used for checking whether the thunk code for
2459
supporting _copy_texts will generate the correct form of data.
2461
self.data = reader_callable(None)
2462
self.stream_index = stream_index
2463
self.backing_knit = backing_knit
2464
self.orig_factory = orig_factory
2466
def get_raw_records(self, memos_for_retrieval):
2467
"""Get the raw bytes for a records.
2469
:param memos_for_retrieval: An iterable of memos from the
2470
_StreamIndex object identifying bytes to read; for these classes
2471
they are (from_backing_knit, index, start, end) and can point to
2472
either the backing knit or streamed data.
2473
:return: An iterator yielding a byte string for each record in
2474
memos_for_retrieval.
2476
# use a generator for memory friendliness
2477
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2478
if not from_backing_knit:
2479
assert version_id is self.stream_index
2480
yield self.data[start:end]
2482
# we have been asked to thunk. This thunking only occurs when
2483
# we are obtaining plain texts from an annotated backing knit
2484
# so that _copy_texts will work.
2485
# We could improve performance here by scanning for where we need
2486
# to do this and using get_line_list, then interleaving the output
2487
# as desired. However, for now, this is sufficient.
2488
if self.orig_factory.__class__ != KnitPlainFactory:
2489
raise errors.KnitCorrupt(
2490
self, 'Bad thunk request %r cannot be backed by %r' %
2491
(version_id, self.orig_factory))
2492
lines = self.backing_knit.get_lines(version_id)
2493
line_bytes = ''.join(lines)
2494
digest = sha_string(line_bytes)
2495
# the packed form of the fulltext always has a trailing newline,
2496
# even if the actual text does not, unless the file is empty. the
2497
# record options including the noeol flag are passed through by
2498
# _StreamIndex, so this is safe.
2500
if lines[-1][-1] != '\n':
2501
lines[-1] = lines[-1] + '\n'
2503
# We want plain data, because we expect to thunk only to allow text
2505
size, bytes = self.backing_knit._data._record_to_data(version_id,
2506
digest, lines, line_bytes)
2510
class _StreamIndex(object):
2511
"""A Knit Index object that uses the data map from a datastream."""
2513
def __init__(self, data_list, backing_index):
2514
"""Create a _StreamIndex object.
2516
:param data_list: The data_list from the datastream.
2517
:param backing_index: The index which will supply values for nodes
2518
referenced outside of this stream.
2520
self.data_list = data_list
2521
self.backing_index = backing_index
2522
self._by_version = {}
2524
for key, options, length, parents in data_list:
2525
self._by_version[key] = options, (pos, pos + length), parents
2528
def get_ancestry(self, versions, topo_sorted):
2529
"""Get an ancestry list for versions."""
2531
# Not needed for basic joins
2532
raise NotImplementedError(self.get_ancestry)
2533
# get a graph of all the mentioned versions:
2534
# Little ugly - basically copied from KnitIndex, but don't want to
2535
# accidentally incorporate too much of that index's code.
2537
pending = set(versions)
2538
cache = self._by_version
2540
version = pending.pop()
2543
parents = [p for p in cache[version][2] if p in cache]
2545
raise RevisionNotPresent(version, self)
2546
# if not completed and not a ghost
2547
pending.update([p for p in parents if p not in ancestry])
2548
ancestry.add(version)
2549
return list(ancestry)
2551
def get_build_details(self, version_ids):
2552
"""Get the method, index_memo and compression parent for version_ids.
2554
Ghosts are omitted from the result.
2556
:param version_ids: An iterable of version_ids.
2557
:return: A dict of version_id:(index_memo, compression_parent,
2558
parents, record_details).
2560
opaque memo that can be passed to _StreamAccess.read_records
2561
to extract the raw data; for these classes it is
2562
(from_backing_knit, index, start, end)
2564
Content that this record is built upon, may be None
2566
Logical parents of this node
2568
extra information about the content which needs to be passed to
2569
Factory.parse_record
2572
for version_id in version_ids:
2574
method = self.get_method(version_id)
2575
except errors.RevisionNotPresent:
2576
# ghosts are omitted
2578
parent_ids = self.get_parents_with_ghosts(version_id)
2579
noeol = ('no-eol' in self.get_options(version_id))
2580
index_memo = self.get_position(version_id)
2581
from_backing_knit = index_memo[0]
2582
if from_backing_knit:
2583
# texts retrieved from the backing knit are always full texts
2585
if method == 'fulltext':
2586
compression_parent = None
2588
compression_parent = parent_ids[0]
2589
result[version_id] = (index_memo, compression_parent,
2590
parent_ids, (method, noeol))
2593
def get_method(self, version_id):
2594
"""Return compression method of specified version."""
2595
options = self.get_options(version_id)
2596
if 'fulltext' in options:
2598
elif 'line-delta' in options:
2601
raise errors.KnitIndexUnknownMethod(self, options)
2603
def get_options(self, version_id):
2604
"""Return a list representing options.
2609
return self._by_version[version_id][0]
2611
options = list(self.backing_index.get_options(version_id))
2612
if 'fulltext' in options:
2614
elif 'line-delta' in options:
2615
# Texts from the backing knit are always returned from the stream
2617
options.remove('line-delta')
2618
options.append('fulltext')
2620
raise errors.KnitIndexUnknownMethod(self, options)
2621
return tuple(options)
2623
def get_parent_map(self, version_ids):
2624
"""Passed through to by KnitVersionedFile.get_parent_map."""
2627
for version_id in version_ids:
2629
result[version_id] = self._by_version[version_id][2]
2631
pending_ids.add(version_id)
2632
result.update(self.backing_index.get_parent_map(pending_ids))
2635
def get_parents_with_ghosts(self, version_id):
2636
"""Return parents of specified version with ghosts."""
2638
return self.get_parent_map([version_id])[version_id]
2640
raise RevisionNotPresent(version_id, self)
2642
def get_position(self, version_id):
2643
"""Return details needed to access the version.
2645
_StreamAccess has the data as a big array, so we return slice
2646
coordinates into that (as index_memo's are opaque outside the
2647
index and matching access class).
2649
:return: a tuple (from_backing_knit, index, start, end) that can
2650
be passed e.g. to get_raw_records.
2651
If from_backing_knit is False, index will be self, otherwise it
2652
will be a version id.
2655
start, end = self._by_version[version_id][1]
2656
return False, self, start, end
2658
# Signal to the access object to handle this from the backing knit.
2659
return (True, version_id, None, None)
2661
def get_versions(self):
2662
"""Get all the versions in the stream."""
2663
return self._by_version.keys()
2666
class _KnitData(object):
2667
"""Manage extraction of data from a KnitAccess, caching and decompressing.
2669
The KnitData class provides the logic for parsing and using knit records,
2670
making use of an access method for the low level read and write operations.
2673
def __init__(self, access):
2674
"""Create a KnitData object.
2676
:param access: The access method to use. Access methods such as
2677
_KnitAccess manage the insertion of raw records and the subsequent
2678
retrieval of the same.
2680
self._access = access
2681
self._checked = False
2683
def _open_file(self):
2684
return self._access.open_file()
2686
def _record_to_data(self, version_id, digest, lines, dense_lines=None):
2687
"""Convert version_id, digest, lines into a raw data block.
2689
:param dense_lines: The bytes of lines but in a denser form. For
2690
instance, if lines is a list of 1000 bytestrings each ending in \n,
2691
dense_lines may be a list with one line in it, containing all the
2692
1000's lines and their \n's. Using dense_lines if it is already
2693
known is a win because the string join to create bytes in this
2694
function spends less time resizing the final string.
2695
:return: (len, a StringIO instance with the raw data ready to read.)
2697
# Note: using a string copy here increases memory pressure with e.g.
2698
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
2699
# when doing the initial commit of a mozilla tree. RBC 20070921
2700
bytes = ''.join(chain(
2701
["version %s %d %s\n" % (version_id,
2704
dense_lines or lines,
2705
["end %s\n" % version_id]))
2706
assert bytes.__class__ == str
2707
compressed_bytes = bytes_to_gzip(bytes)
2708
return len(compressed_bytes), compressed_bytes
2710
def add_raw_records(self, sizes, raw_data):
2711
"""Append a prepared record to the data file.
2713
:param sizes: An iterable containing the size of each raw data segment.
2714
:param raw_data: A bytestring containing the data.
2715
:return: a list of index data for the way the data was stored.
2716
See the access method add_raw_records documentation for more
2719
return self._access.add_raw_records(sizes, raw_data)
2721
def _parse_record_header(self, version_id, raw_data):
2722
"""Parse a record header for consistency.
2724
:return: the header and the decompressor stream.
2725
as (stream, header_record)
2727
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
2729
rec = self._check_header(version_id, df.readline())
2730
except Exception, e:
2731
raise KnitCorrupt(self._access,
2732
"While reading {%s} got %s(%s)"
2733
% (version_id, e.__class__.__name__, str(e)))
2736
def _split_header(self, line):
2739
raise KnitCorrupt(self._access,
2740
'unexpected number of elements in record header')
2743
def _check_header_version(self, rec, version_id):
2744
if rec[1] != version_id:
2745
raise KnitCorrupt(self._access,
2746
'unexpected version, wanted %r, got %r'
2747
% (version_id, rec[1]))
2749
def _check_header(self, version_id, line):
2750
rec = self._split_header(line)
2751
self._check_header_version(rec, version_id)
2754
def _parse_record_unchecked(self, data):
2756
# 4168 calls in 2880 217 internal
2757
# 4168 calls to _parse_record_header in 2121
2758
# 4168 calls to readlines in 330
2759
df = GzipFile(mode='rb', fileobj=StringIO(data))
2761
record_contents = df.readlines()
2762
except Exception, e:
2763
raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
2764
(data, e.__class__.__name__, str(e)))
2765
header = record_contents.pop(0)
2766
rec = self._split_header(header)
2767
last_line = record_contents.pop()
2768
if len(record_contents) != int(rec[2]):
2769
raise KnitCorrupt(self._access,
2770
'incorrect number of lines %s != %s'
2772
% (len(record_contents), int(rec[2]),
2774
if last_line != 'end %s\n' % rec[1]:
2775
raise KnitCorrupt(self._access,
2776
'unexpected version end line %r, wanted %r'
2777
% (last_line, rec[1]))
2779
return rec, record_contents
2781
def _parse_record(self, version_id, data):
2782
rec, record_contents = self._parse_record_unchecked(data)
2783
self._check_header_version(rec, version_id)
2784
return record_contents, rec[3]
2786
def read_records_iter_raw(self, records):
2787
"""Read text records from data file and yield raw data.
2789
This unpacks enough of the text record to validate the id is
2790
as expected but thats all.
2792
Each item the iterator yields is (version_id, bytes,
2795
# setup an iterator of the external records:
2796
# uses readv so nice and fast we hope.
2798
# grab the disk data needed.
2799
needed_offsets = [index_memo for version_id, index_memo
2801
raw_records = self._access.get_raw_records(needed_offsets)
2803
for version_id, index_memo in records:
2804
data = raw_records.next()
2805
# validate the header
2806
df, rec = self._parse_record_header(version_id, data)
2808
yield version_id, data, rec[3]
2810
def read_records_iter(self, records):
2811
"""Read text records from data file and yield result.
2813
The result will be returned in whatever is the fastest to read.
2814
Not by the order requested. Also, multiple requests for the same
2815
record will only yield 1 response.
2816
:param records: A list of (version_id, pos, len) entries
2817
:return: Yields (version_id, contents, digest) in the order
2818
read, not the order requested
2823
needed_records = sorted(set(records), key=operator.itemgetter(1))
2824
if not needed_records:
2827
# The transport optimizes the fetching as well
2828
# (ie, reads continuous ranges.)
2829
raw_data = self._access.get_raw_records(
2830
[index_memo for version_id, index_memo in needed_records])
2832
for (version_id, index_memo), data in \
2833
izip(iter(needed_records), raw_data):
2834
content, digest = self._parse_record(version_id, data)
2835
yield version_id, content, digest
2837
def read_records(self, records):
2838
"""Read records into a dictionary."""
2840
for record_id, content, digest in \
2841
self.read_records_iter(records):
2842
components[record_id] = (content, digest)
2846
class InterKnit(InterVersionedFile):
2847
"""Optimised code paths for knit to knit operations."""
2849
_matching_file_from_factory = staticmethod(make_file_knit)
2850
_matching_file_to_factory = staticmethod(make_file_knit)
2853
def is_compatible(source, target):
2854
"""Be compatible with knits. """
2856
return (isinstance(source, KnitVersionedFile) and
2857
isinstance(target, KnitVersionedFile))
2858
except AttributeError:
2861
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2862
"""Copy texts to the target by extracting and adding them one by one.
2864
see join() for the parameter definitions.
2866
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2867
# --- the below is factorable out with VersionedFile.join, but wait for
2868
# VersionedFiles, it may all be simpler then.
2869
graph = Graph(self.source)
2870
search = graph._make_breadth_first_searcher(version_ids)
2871
transitive_ids = set()
2872
map(transitive_ids.update, list(search))
2873
parent_map = self.source.get_parent_map(transitive_ids)
2874
order = topo_sort(parent_map.items())
2876
def size_of_content(content):
2877
return sum(len(line) for line in content.text())
2878
# Cache at most 10MB of parent texts
2879
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2880
compute_size=size_of_content)
2881
# TODO: jam 20071116 It would be nice to have a streaming interface to
2882
# get multiple texts from a source. The source could be smarter
2883
# about how it handled intermediate stages.
2884
# get_line_list() or make_mpdiffs() seem like a possibility, but
2885
# at the moment they extract all full texts into memory, which
2886
# causes us to store more than our 3x fulltext goal.
2887
# Repository.iter_files_bytes() may be another possibility
2888
to_process = [version for version in order
2889
if version not in self.target]
2890
total = len(to_process)
2891
pb = ui.ui_factory.nested_progress_bar()
2893
for index, version in enumerate(to_process):
2894
pb.update('Converting versioned data', index, total)
2895
sha1, num_bytes, parent_text = self.target.add_lines(version,
2896
self.source.get_parents_with_ghosts(version),
2897
self.source.get_lines(version),
2898
parent_texts=parent_cache)
2899
parent_cache[version] = parent_text
2904
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2905
"""See InterVersionedFile.join."""
2906
assert isinstance(self.source, KnitVersionedFile)
2907
assert isinstance(self.target, KnitVersionedFile)
2909
# If the source and target are mismatched w.r.t. annotations vs
2910
# plain, the data needs to be converted accordingly
2911
if self.source.factory.annotated == self.target.factory.annotated:
2913
elif self.source.factory.annotated:
2914
converter = self._anno_to_plain_converter
2916
# We're converting from a plain to an annotated knit. Copy them
2917
# across by full texts.
2918
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2920
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2924
pb = ui.ui_factory.nested_progress_bar()
2926
version_ids = list(version_ids)
2927
if None in version_ids:
2928
version_ids.remove(None)
2930
self.source_ancestry = set(self.source.get_ancestry(version_ids,
2932
this_versions = set(self.target._index.get_versions())
2933
# XXX: For efficiency we should not look at the whole index,
2934
# we only need to consider the referenced revisions - they
2935
# must all be present, or the method must be full-text.
2936
# TODO, RBC 20070919
2937
needed_versions = self.source_ancestry - this_versions
2939
if not needed_versions:
2941
full_list = topo_sort(
2942
self.source.get_parent_map(self.source.versions()))
2944
version_list = [i for i in full_list if (not self.target.has_version(i)
2945
and i in needed_versions)]
2949
copy_queue_records = []
2951
for version_id in version_list:
2952
options = self.source._index.get_options(version_id)
2953
parents = self.source._index.get_parents_with_ghosts(version_id)
2954
# check that its will be a consistent copy:
2955
for parent in parents:
2956
# if source has the parent, we must :
2957
# * already have it or
2958
# * have it scheduled already
2959
# otherwise we don't care
2960
assert (self.target.has_version(parent) or
2961
parent in copy_set or
2962
not self.source.has_version(parent))
2963
index_memo = self.source._index.get_position(version_id)
2964
copy_queue_records.append((version_id, index_memo))
2965
copy_queue.append((version_id, options, parents))
2966
copy_set.add(version_id)
2968
# data suck the join:
2970
total = len(version_list)
2973
for (version_id, raw_data, _), \
2974
(version_id2, options, parents) in \
2975
izip(self.source._data.read_records_iter_raw(copy_queue_records),
2977
assert version_id == version_id2, 'logic error, inconsistent results'
2979
pb.update("Joining knit", count, total)
2981
size, raw_data = converter(raw_data, version_id, options,
2984
size = len(raw_data)
2985
raw_records.append((version_id, options, parents, size))
2986
raw_datum.append(raw_data)
2987
self.target._add_raw_records(raw_records, ''.join(raw_datum))
2992
def _anno_to_plain_converter(self, raw_data, version_id, options,
2994
"""Convert annotated content to plain content."""
2995
data, digest = self.source._data._parse_record(version_id, raw_data)
2996
if 'fulltext' in options:
2997
content = self.source.factory.parse_fulltext(data, version_id)
2998
lines = self.target.factory.lower_fulltext(content)
3000
delta = self.source.factory.parse_line_delta(data, version_id,
3002
lines = self.target.factory.lower_line_delta(delta)
3003
return self.target._data._record_to_data(version_id, digest, lines)
3006
InterVersionedFile.register_optimiser(InterKnit)
3009
class WeaveToKnit(InterVersionedFile):
3010
"""Optimised code paths for weave to knit operations."""
3012
_matching_file_from_factory = bzrlib.weave.WeaveFile
3013
_matching_file_to_factory = staticmethod(make_file_knit)
3016
def is_compatible(source, target):
3017
"""Be compatible with weaves to knits."""
3019
return (isinstance(source, bzrlib.weave.Weave) and
3020
isinstance(target, KnitVersionedFile))
3021
except AttributeError:
3024
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
3025
"""See InterVersionedFile.join."""
3026
assert isinstance(self.source, bzrlib.weave.Weave)
3027
assert isinstance(self.target, KnitVersionedFile)
3029
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
3034
pb = ui.ui_factory.nested_progress_bar()
3036
version_ids = list(version_ids)
3038
self.source_ancestry = set(self.source.get_ancestry(version_ids))
3039
this_versions = set(self.target._index.get_versions())
3040
needed_versions = self.source_ancestry - this_versions
3042
if not needed_versions:
3044
full_list = topo_sort(
3045
self.source.get_parent_map(self.source.versions()))
3047
version_list = [i for i in full_list if (not self.target.has_version(i)
3048
and i in needed_versions)]
3052
total = len(version_list)
3053
parent_map = self.source.get_parent_map(version_list)
3054
for version_id in version_list:
3055
pb.update("Converting to knit", count, total)
3056
parents = parent_map[version_id]
3057
# check that its will be a consistent copy:
3058
for parent in parents:
3059
# if source has the parent, we must already have it
3060
assert (self.target.has_version(parent))
3061
self.target.add_lines(
3062
version_id, parents, self.source.get_lines(version_id))
3069
InterVersionedFile.register_optimiser(WeaveToKnit)
3072
# Deprecated, use PatienceSequenceMatcher instead
3073
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3076
def annotate_knit(knit, revision_id):
3077
"""Annotate a knit with no cached annotations.
3079
This implementation is for knits with no cached annotations.
3080
It will work for knits with cached annotations, but this is not
3083
annotator = _KnitAnnotator(knit)
3084
return iter(annotator.annotate(revision_id))
3087
class _KnitAnnotator(object):
3088
"""Build up the annotations for a text."""
3090
def __init__(self, knit):
3093
# Content objects, differs from fulltexts because of how final newlines
3094
# are treated by knits. the content objects here will always have a
3096
self._fulltext_contents = {}
3098
# Annotated lines of specific revisions
3099
self._annotated_lines = {}
3101
# Track the raw data for nodes that we could not process yet.
3102
# This maps the revision_id of the base to a list of children that will
3103
# annotated from it.
3104
self._pending_children = {}
3106
# Nodes which cannot be extracted
3107
self._ghosts = set()
3109
# Track how many children this node has, so we know if we need to keep
3111
self._annotate_children = {}
3112
self._compression_children = {}
3114
self._all_build_details = {}
3115
# The children => parent revision_id graph
3116
self._revision_id_graph = {}
3118
self._heads_provider = None
3120
self._nodes_to_keep_annotations = set()
3121
self._generations_until_keep = 100
3123
def set_generations_until_keep(self, value):
3124
"""Set the number of generations before caching a node.
3126
Setting this to -1 will cache every merge node, setting this higher
3127
will cache fewer nodes.
3129
self._generations_until_keep = value
3131
def _add_fulltext_content(self, revision_id, content_obj):
3132
self._fulltext_contents[revision_id] = content_obj
3133
# TODO: jam 20080305 It might be good to check the sha1digest here
3134
return content_obj.text()
3136
def _check_parents(self, child, nodes_to_annotate):
3137
"""Check if all parents have been processed.
3139
:param child: A tuple of (rev_id, parents, raw_content)
3140
:param nodes_to_annotate: If child is ready, add it to
3141
nodes_to_annotate, otherwise put it back in self._pending_children
3143
for parent_id in child[1]:
3144
if (parent_id not in self._annotated_lines):
3145
# This parent is present, but another parent is missing
3146
self._pending_children.setdefault(parent_id,
3150
# This one is ready to be processed
3151
nodes_to_annotate.append(child)
3153
def _add_annotation(self, revision_id, fulltext, parent_ids,
3154
left_matching_blocks=None):
3155
"""Add an annotation entry.
3157
All parents should already have been annotated.
3158
:return: A list of children that now have their parents satisfied.
3160
a = self._annotated_lines
3161
annotated_parent_lines = [a[p] for p in parent_ids]
3162
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
3163
fulltext, revision_id, left_matching_blocks,
3164
heads_provider=self._get_heads_provider()))
3165
self._annotated_lines[revision_id] = annotated_lines
3166
for p in parent_ids:
3167
ann_children = self._annotate_children[p]
3168
ann_children.remove(revision_id)
3169
if (not ann_children
3170
and p not in self._nodes_to_keep_annotations):
3171
del self._annotated_lines[p]
3172
del self._all_build_details[p]
3173
if p in self._fulltext_contents:
3174
del self._fulltext_contents[p]
3175
# Now that we've added this one, see if there are any pending
3176
# deltas to be done, certainly this parent is finished
3177
nodes_to_annotate = []
3178
for child in self._pending_children.pop(revision_id, []):
3179
self._check_parents(child, nodes_to_annotate)
3180
return nodes_to_annotate
3182
def _get_build_graph(self, revision_id):
3183
"""Get the graphs for building texts and annotations.
3185
The data you need for creating a full text may be different than the
3186
data you need to annotate that text. (At a minimum, you need both
3187
parents to create an annotation, but only need 1 parent to generate the
3190
:return: A list of (revision_id, index_memo) records, suitable for
3191
passing to read_records_iter to start reading in the raw data fro/
3194
if revision_id in self._annotated_lines:
3197
pending = set([revision_id])
3202
# get all pending nodes
3204
this_iteration = pending
3205
build_details = self._knit._index.get_build_details(this_iteration)
3206
self._all_build_details.update(build_details)
3207
# new_nodes = self._knit._index._get_entries(this_iteration)
3209
for rev_id, details in build_details.iteritems():
3210
(index_memo, compression_parent, parents,
3211
record_details) = details
3212
self._revision_id_graph[rev_id] = parents
3213
records.append((rev_id, index_memo))
3214
# Do we actually need to check _annotated_lines?
3215
pending.update(p for p in parents
3216
if p not in self._all_build_details)
3217
if compression_parent:
3218
self._compression_children.setdefault(compression_parent,
3221
for parent in parents:
3222
self._annotate_children.setdefault(parent,
3224
num_gens = generation - kept_generation
3225
if ((num_gens >= self._generations_until_keep)
3226
and len(parents) > 1):
3227
kept_generation = generation
3228
self._nodes_to_keep_annotations.add(rev_id)
3230
missing_versions = this_iteration.difference(build_details.keys())
3231
self._ghosts.update(missing_versions)
3232
for missing_version in missing_versions:
3233
# add a key, no parents
3234
self._revision_id_graph[missing_version] = ()
3235
pending.discard(missing_version) # don't look for it
3236
# XXX: This should probably be a real exception, as it is a data
3238
assert not self._ghosts.intersection(self._compression_children), \
3239
"We cannot have nodes which have a compression parent of a ghost."
3240
# Cleanout anything that depends on a ghost so that we don't wait for
3241
# the ghost to show up
3242
for node in self._ghosts:
3243
if node in self._annotate_children:
3244
# We won't be building this node
3245
del self._annotate_children[node]
3246
# Generally we will want to read the records in reverse order, because
3247
# we find the parent nodes after the children
3251
def _annotate_records(self, records):
3252
"""Build the annotations for the listed records."""
3253
# We iterate in the order read, rather than a strict order requested
3254
# However, process what we can, and put off to the side things that
3255
# still need parents, cleaning them up when those parents are
3257
for (rev_id, record,
3258
digest) in self._knit._data.read_records_iter(records):
3259
if rev_id in self._annotated_lines:
3261
parent_ids = self._revision_id_graph[rev_id]
3262
parent_ids = [p for p in parent_ids if p not in self._ghosts]
3263
details = self._all_build_details[rev_id]
3264
(index_memo, compression_parent, parents,
3265
record_details) = details
3266
nodes_to_annotate = []
3267
# TODO: Remove the punning between compression parents, and
3268
# parent_ids, we should be able to do this without assuming
3270
if len(parent_ids) == 0:
3271
# There are no parents for this node, so just add it
3272
# TODO: This probably needs to be decoupled
3273
assert compression_parent is None
3274
fulltext_content, delta = self._knit.factory.parse_record(
3275
rev_id, record, record_details, None)
3276
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3277
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3278
parent_ids, left_matching_blocks=None))
3280
child = (rev_id, parent_ids, record)
3281
# Check if all the parents are present
3282
self._check_parents(child, nodes_to_annotate)
3283
while nodes_to_annotate:
3284
# Should we use a queue here instead of a stack?
3285
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
3286
(index_memo, compression_parent, parents,
3287
record_details) = self._all_build_details[rev_id]
3288
if compression_parent is not None:
3289
comp_children = self._compression_children[compression_parent]
3290
assert rev_id in comp_children
3291
# If there is only 1 child, it is safe to reuse this
3293
reuse_content = (len(comp_children) == 1
3294
and compression_parent not in
3295
self._nodes_to_keep_annotations)
3297
# Remove it from the cache since it will be changing
3298
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3299
# Make sure to copy the fulltext since it might be
3301
parent_fulltext = list(parent_fulltext_content.text())
3303
parent_fulltext_content = self._fulltext_contents[compression_parent]
3304
parent_fulltext = parent_fulltext_content.text()
3305
comp_children.remove(rev_id)
3306
fulltext_content, delta = self._knit.factory.parse_record(
3307
rev_id, record, record_details,
3308
parent_fulltext_content,
3309
copy_base_content=(not reuse_content))
3310
fulltext = self._add_fulltext_content(rev_id,
3312
blocks = KnitContent.get_line_delta_blocks(delta,
3313
parent_fulltext, fulltext)
3315
fulltext_content = self._knit.factory.parse_fulltext(
3317
fulltext = self._add_fulltext_content(rev_id,
3320
nodes_to_annotate.extend(
3321
self._add_annotation(rev_id, fulltext, parent_ids,
3322
left_matching_blocks=blocks))
3324
def _get_heads_provider(self):
3325
"""Create a heads provider for resolving ancestry issues."""
3326
if self._heads_provider is not None:
3327
return self._heads_provider
3328
parent_provider = _mod_graph.DictParentsProvider(
3329
self._revision_id_graph)
3330
graph_obj = _mod_graph.Graph(parent_provider)
3331
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
3332
self._heads_provider = head_cache
3335
def annotate(self, revision_id):
3336
"""Return the annotated fulltext at the given revision.
3338
:param revision_id: The revision id for this file
3340
records = self._get_build_graph(revision_id)
3341
if revision_id in self._ghosts:
3342
raise errors.RevisionNotPresent(revision_id, self._knit)
3343
self._annotate_records(records)
3344
return self._annotated_lines[revision_id]
3348
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3350
from bzrlib._knit_load_data_py import _load_data_py as _load_data