1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
64
from cStringIO import StringIO
65
from itertools import izip, chain
70
from zlib import Z_DEFAULT_COMPRESSION
73
from bzrlib.lazy_import import lazy_import
74
lazy_import(globals(), """
94
from bzrlib.errors import (
102
RevisionAlreadyPresent,
104
from bzrlib.graph import Graph
105
from bzrlib.osutils import (
112
from bzrlib.symbol_versioning import (
113
DEPRECATED_PARAMETER,
118
from bzrlib.tsort import topo_sort
119
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
121
from bzrlib.versionedfile import (
122
AbsentContentFactory,
131
# TODO: Split out code specific to this format into an associated object.
133
# TODO: Can we put in some kind of value to check that the index and data
134
# files belong together?
136
# TODO: accommodate binaries, perhaps by storing a byte count
138
# TODO: function to check whole file
140
# TODO: atomically append data, then measure backwards from the cursor
141
# position after writing to work out where it was located. we may need to
142
# bypass python file buffering.
144
DATA_SUFFIX = '.knit'
145
INDEX_SUFFIX = '.kndx'
148
class KnitAdapter(object):
149
"""Base class for knit record adaption."""
151
def __init__(self, basis_vf):
152
"""Create an adapter which accesses full texts from basis_vf.
154
:param basis_vf: A versioned file to access basis texts of deltas from.
155
May be None for adapters that do not need to access basis texts.
157
self._data = _KnitData(None)
158
self._annotate_factory = KnitAnnotateFactory()
159
self._plain_factory = KnitPlainFactory()
160
self._basis_vf = basis_vf
163
class FTAnnotatedToUnannotated(KnitAdapter):
164
"""An adapter from FT annotated knits to unannotated ones."""
166
def get_bytes(self, factory, annotated_compressed_bytes):
168
self._data._parse_record_unchecked(annotated_compressed_bytes)
169
content = self._annotate_factory.parse_fulltext(contents, rec[1])
170
size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
174
class DeltaAnnotatedToUnannotated(KnitAdapter):
175
"""An adapter for deltas from annotated to unannotated."""
177
def get_bytes(self, factory, annotated_compressed_bytes):
179
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
182
contents = self._plain_factory.lower_line_delta(delta)
183
size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
187
class FTAnnotatedToFullText(KnitAdapter):
188
"""An adapter from FT annotated knits to unannotated ones."""
190
def get_bytes(self, factory, annotated_compressed_bytes):
192
self._data._parse_record_unchecked(annotated_compressed_bytes)
193
content, delta = self._annotate_factory.parse_record(factory.key[0],
194
contents, factory._build_details, None)
195
return ''.join(content.text())
198
class DeltaAnnotatedToFullText(KnitAdapter):
199
"""An adapter for deltas from annotated to unannotated."""
201
def get_bytes(self, factory, annotated_compressed_bytes):
203
self._data._parse_record_unchecked(annotated_compressed_bytes)
204
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
206
compression_parent = factory.parents[0][0]
207
basis_lines = self._basis_vf.get_lines(compression_parent)
208
# Manually apply the delta because we have one annotated content and
210
basis_content = PlainKnitContent(basis_lines, compression_parent)
211
basis_content.apply_delta(delta, rec[1])
212
basis_content._should_strip_eol = factory._build_details[1]
213
return ''.join(basis_content.text())
216
class FTPlainToFullText(KnitAdapter):
217
"""An adapter from FT plain knits to unannotated ones."""
219
def get_bytes(self, factory, compressed_bytes):
221
self._data._parse_record_unchecked(compressed_bytes)
222
content, delta = self._plain_factory.parse_record(factory.key[0],
223
contents, factory._build_details, None)
224
return ''.join(content.text())
227
class DeltaPlainToFullText(KnitAdapter):
228
"""An adapter for deltas from annotated to unannotated."""
230
def get_bytes(self, factory, compressed_bytes):
232
self._data._parse_record_unchecked(compressed_bytes)
233
delta = self._plain_factory.parse_line_delta(contents, rec[1])
234
compression_parent = factory.parents[0][0]
235
basis_lines = self._basis_vf.get_lines(compression_parent)
236
basis_content = PlainKnitContent(basis_lines, compression_parent)
237
# Manually apply the delta because we have one annotated content and
239
content, _ = self._plain_factory.parse_record(rec[1], contents,
240
factory._build_details, basis_content)
241
return ''.join(content.text())
244
class KnitContentFactory(ContentFactory):
245
"""Content factory for streaming from knits.
247
:seealso ContentFactory:
250
def __init__(self, version, parents, build_details, sha1, raw_record,
251
annotated, knit=None):
252
"""Create a KnitContentFactory for version.
254
:param version: The version.
255
:param parents: The parents.
256
:param build_details: The build details as returned from
258
:param sha1: The sha1 expected from the full text of this object.
259
:param raw_record: The bytes of the knit data from disk.
260
:param annotated: True if the raw data is annotated.
262
ContentFactory.__init__(self)
264
self.key = (version,)
265
self.parents = tuple((parent,) for parent in parents)
266
if build_details[0] == 'line-delta':
271
annotated_kind = 'annotated-'
274
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
275
self._raw_record = raw_record
276
self._build_details = build_details
279
def get_bytes_as(self, storage_kind):
280
if storage_kind == self.storage_kind:
281
return self._raw_record
282
if storage_kind == 'fulltext' and self._knit is not None:
283
return self._knit.get_text(self.key[0])
285
raise errors.UnavailableRepresentation(self.key, storage_kind,
289
class KnitContent(object):
290
"""Content of a knit version to which deltas can be applied."""
293
self._should_strip_eol = False
295
def apply_delta(self, delta, new_version_id):
296
"""Apply delta to this object to become new_version_id."""
297
raise NotImplementedError(self.apply_delta)
299
def cleanup_eol(self, copy_on_mutate=True):
300
if self._should_strip_eol:
302
self._lines = self._lines[:]
303
self.strip_last_line_newline()
305
def line_delta_iter(self, new_lines):
306
"""Generate line-based delta from this content to new_lines."""
307
new_texts = new_lines.text()
308
old_texts = self.text()
309
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
310
for tag, i1, i2, j1, j2 in s.get_opcodes():
313
# ofrom, oto, length, data
314
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
316
def line_delta(self, new_lines):
317
return list(self.line_delta_iter(new_lines))
320
def get_line_delta_blocks(knit_delta, source, target):
321
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
322
target_len = len(target)
325
for s_begin, s_end, t_len, new_text in knit_delta:
326
true_n = s_begin - s_pos
329
# knit deltas do not provide reliable info about whether the
330
# last line of a file matches, due to eol handling.
331
if source[s_pos + n -1] != target[t_pos + n -1]:
334
yield s_pos, t_pos, n
335
t_pos += t_len + true_n
337
n = target_len - t_pos
339
if source[s_pos + n -1] != target[t_pos + n -1]:
342
yield s_pos, t_pos, n
343
yield s_pos + (target_len - t_pos), target_len, 0
346
class AnnotatedKnitContent(KnitContent):
347
"""Annotated content."""
349
def __init__(self, lines):
350
KnitContent.__init__(self)
354
"""Return a list of (origin, text) for each content line."""
355
return list(self._lines)
357
def apply_delta(self, delta, new_version_id):
358
"""Apply delta to this object to become new_version_id."""
361
for start, end, count, delta_lines in delta:
362
lines[offset+start:offset+end] = delta_lines
363
offset = offset + (start - end) + count
365
def strip_last_line_newline(self):
366
line = self._lines[-1][1].rstrip('\n')
367
self._lines[-1] = (self._lines[-1][0], line)
368
self._should_strip_eol = False
372
lines = [text for origin, text in self._lines]
373
except ValueError, e:
374
# most commonly (only?) caused by the internal form of the knit
375
# missing annotation information because of a bug - see thread
377
raise KnitCorrupt(self,
378
"line in annotated knit missing annotation information: %s"
381
if self._should_strip_eol:
382
lines[-1] = lines[-1].rstrip('\n')
386
return AnnotatedKnitContent(self._lines[:])
389
class PlainKnitContent(KnitContent):
390
"""Unannotated content.
392
When annotate[_iter] is called on this content, the same version is reported
393
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
397
def __init__(self, lines, version_id):
398
KnitContent.__init__(self)
400
self._version_id = version_id
403
"""Return a list of (origin, text) for each content line."""
404
return [(self._version_id, line) for line in self._lines]
406
def apply_delta(self, delta, new_version_id):
407
"""Apply delta to this object to become new_version_id."""
410
for start, end, count, delta_lines in delta:
411
lines[offset+start:offset+end] = delta_lines
412
offset = offset + (start - end) + count
413
self._version_id = new_version_id
416
return PlainKnitContent(self._lines[:], self._version_id)
418
def strip_last_line_newline(self):
419
self._lines[-1] = self._lines[-1].rstrip('\n')
420
self._should_strip_eol = False
424
if self._should_strip_eol:
426
lines[-1] = lines[-1].rstrip('\n')
430
class _KnitFactory(object):
431
"""Base class for common Factory functions."""
433
def parse_record(self, version_id, record, record_details,
434
base_content, copy_base_content=True):
435
"""Parse a record into a full content object.
437
:param version_id: The official version id for this content
438
:param record: The data returned by read_records_iter()
439
:param record_details: Details about the record returned by
441
:param base_content: If get_build_details returns a compression_parent,
442
you must return a base_content here, else use None
443
:param copy_base_content: When building from the base_content, decide
444
you can either copy it and return a new object, or modify it in
446
:return: (content, delta) A Content object and possibly a line-delta,
449
method, noeol = record_details
450
if method == 'line-delta':
451
if copy_base_content:
452
content = base_content.copy()
454
content = base_content
455
delta = self.parse_line_delta(record, version_id)
456
content.apply_delta(delta, version_id)
458
content = self.parse_fulltext(record, version_id)
460
content._should_strip_eol = noeol
461
return (content, delta)
464
class KnitAnnotateFactory(_KnitFactory):
465
"""Factory for creating annotated Content objects."""
469
def make(self, lines, version_id):
470
num_lines = len(lines)
471
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
473
def parse_fulltext(self, content, version_id):
474
"""Convert fulltext to internal representation
476
fulltext content is of the format
477
revid(utf8) plaintext\n
478
internal representation is of the format:
481
# TODO: jam 20070209 The tests expect this to be returned as tuples,
482
# but the code itself doesn't really depend on that.
483
# Figure out a way to not require the overhead of turning the
484
# list back into tuples.
485
lines = [tuple(line.split(' ', 1)) for line in content]
486
return AnnotatedKnitContent(lines)
488
def parse_line_delta_iter(self, lines):
489
return iter(self.parse_line_delta(lines))
491
def parse_line_delta(self, lines, version_id, plain=False):
492
"""Convert a line based delta into internal representation.
494
line delta is in the form of:
495
intstart intend intcount
497
revid(utf8) newline\n
498
internal representation is
499
(start, end, count, [1..count tuples (revid, newline)])
501
:param plain: If True, the lines are returned as a plain
502
list without annotations, not as a list of (origin, content) tuples, i.e.
503
(start, end, count, [1..count newline])
510
def cache_and_return(line):
511
origin, text = line.split(' ', 1)
512
return cache.setdefault(origin, origin), text
514
# walk through the lines parsing.
515
# Note that the plain test is explicitly pulled out of the
516
# loop to minimise any performance impact
519
start, end, count = [int(n) for n in header.split(',')]
520
contents = [next().split(' ', 1)[1] for i in xrange(count)]
521
result.append((start, end, count, contents))
524
start, end, count = [int(n) for n in header.split(',')]
525
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
526
result.append((start, end, count, contents))
529
def get_fulltext_content(self, lines):
530
"""Extract just the content lines from a fulltext."""
531
return (line.split(' ', 1)[1] for line in lines)
533
def get_linedelta_content(self, lines):
534
"""Extract just the content from a line delta.
536
This doesn't return all of the extra information stored in a delta.
537
Only the actual content lines.
542
header = header.split(',')
543
count = int(header[2])
544
for i in xrange(count):
545
origin, text = next().split(' ', 1)
548
def lower_fulltext(self, content):
549
"""convert a fulltext content record into a serializable form.
551
see parse_fulltext which this inverts.
553
# TODO: jam 20070209 We only do the caching thing to make sure that
554
# the origin is a valid utf-8 line, eventually we could remove it
555
return ['%s %s' % (o, t) for o, t in content._lines]
557
def lower_line_delta(self, delta):
558
"""convert a delta into a serializable form.
560
See parse_line_delta which this inverts.
562
# TODO: jam 20070209 We only do the caching thing to make sure that
563
# the origin is a valid utf-8 line, eventually we could remove it
565
for start, end, c, lines in delta:
566
out.append('%d,%d,%d\n' % (start, end, c))
567
out.extend(origin + ' ' + text
568
for origin, text in lines)
571
def annotate(self, knit, version_id):
572
content = knit._get_content(version_id)
573
return content.annotate()
576
class KnitPlainFactory(_KnitFactory):
577
"""Factory for creating plain Content objects."""
581
def make(self, lines, version_id):
582
return PlainKnitContent(lines, version_id)
584
def parse_fulltext(self, content, version_id):
585
"""This parses an unannotated fulltext.
587
Note that this is not a noop - the internal representation
588
has (versionid, line) - its just a constant versionid.
590
return self.make(content, version_id)
592
def parse_line_delta_iter(self, lines, version_id):
594
num_lines = len(lines)
595
while cur < num_lines:
598
start, end, c = [int(n) for n in header.split(',')]
599
yield start, end, c, lines[cur:cur+c]
602
def parse_line_delta(self, lines, version_id):
603
return list(self.parse_line_delta_iter(lines, version_id))
605
def get_fulltext_content(self, lines):
606
"""Extract just the content lines from a fulltext."""
609
def get_linedelta_content(self, lines):
610
"""Extract just the content from a line delta.
612
This doesn't return all of the extra information stored in a delta.
613
Only the actual content lines.
618
header = header.split(',')
619
count = int(header[2])
620
for i in xrange(count):
623
def lower_fulltext(self, content):
624
return content.text()
626
def lower_line_delta(self, delta):
628
for start, end, c, lines in delta:
629
out.append('%d,%d,%d\n' % (start, end, c))
633
def annotate(self, knit, version_id):
634
annotator = _KnitAnnotator(knit)
635
return annotator.annotate(version_id)
638
def make_empty_knit(transport, relpath):
639
"""Construct a empty knit at the specified location."""
640
k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
643
def make_file_knit(name, transport, file_mode=None, access_mode='w',
644
factory=None, delta=True, create=False, create_parent_dir=False,
645
delay_create=False, dir_mode=None, get_scope=None):
646
"""Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
648
factory = KnitAnnotateFactory()
649
if get_scope is None:
650
get_scope = lambda:None
651
index = _KnitIndex(transport, name + INDEX_SUFFIX,
652
access_mode, create=create, file_mode=file_mode,
653
create_parent_dir=create_parent_dir, delay_create=delay_create,
654
dir_mode=dir_mode, get_scope=get_scope)
655
access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
656
dir_mode, ((create and not len(index)) and delay_create),
658
return KnitVersionedFile(name, transport, factory=factory,
659
create=create, delay_create=delay_create, index=index,
660
access_method=access)
664
"""Return the suffixes used by file based knits."""
665
return [DATA_SUFFIX, INDEX_SUFFIX]
666
make_file_knit.get_suffixes = get_suffixes
669
class KnitVersionedFile(VersionedFile):
670
"""Weave-like structure with faster random access.
672
A knit stores a number of texts and a summary of the relationships
673
between them. Texts are identified by a string version-id. Texts
674
are normally stored and retrieved as a series of lines, but can
675
also be passed as single strings.
677
Lines are stored with the trailing newline (if any) included, to
678
avoid special cases for files with no final newline. Lines are
679
composed of 8-bit characters, not unicode. The combination of
680
these approaches should mean any 'binary' file can be safely
681
stored and retrieved.
684
def __init__(self, relpath, transport, file_mode=None,
685
factory=None, delta=True, create=False, create_parent_dir=False,
686
delay_create=False, dir_mode=None, index=None, access_method=None):
687
"""Construct a knit at location specified by relpath.
689
:param create: If not True, only open an existing knit.
690
:param create_parent_dir: If True, create the parent directory if
691
creating the file fails. (This is used for stores with
692
hash-prefixes that may not exist yet)
693
:param delay_create: The calling code is aware that the knit won't
694
actually be created until the first data is stored.
695
:param index: An index to use for the knit.
697
super(KnitVersionedFile, self).__init__()
698
self.transport = transport
699
self.filename = relpath
700
self.factory = factory or KnitAnnotateFactory()
703
self._max_delta_chain = 200
705
if None in (access_method, index):
706
raise ValueError("No default access_method or index any more")
708
_access = access_method
709
if create and not len(self) and not delay_create:
711
self._data = _KnitData(_access)
714
return '%s(%s)' % (self.__class__.__name__,
715
self.transport.abspath(self.filename))
717
def _check_should_delta(self, first_parents):
718
"""Iterate back through the parent listing, looking for a fulltext.
720
This is used when we want to decide whether to add a delta or a new
721
fulltext. It searches for _max_delta_chain parents. When it finds a
722
fulltext parent, it sees if the total size of the deltas leading up to
723
it is large enough to indicate that we want a new full text anyway.
725
Return True if we should create a new delta, False if we should use a
730
delta_parents = first_parents
731
for count in xrange(self._max_delta_chain):
732
parent = delta_parents[0]
733
method = self._index.get_method(parent)
734
index, pos, size = self._index.get_position(parent)
735
if method == 'fulltext':
739
delta_parents = self._index.get_parent_map([parent])[parent]
741
# We couldn't find a fulltext, so we must create a new one
744
return fulltext_size > delta_size
746
def _check_write_ok(self):
747
return self._index._check_write_ok()
749
def _add_raw_records(self, records, data):
750
"""Add all the records 'records' with data pre-joined in 'data'.
752
:param records: A list of tuples(version_id, options, parents, size).
753
:param data: The data for the records. When it is written, the records
754
are adjusted to have pos pointing into data by the sum of
755
the preceding records sizes.
758
raw_record_sizes = [record[3] for record in records]
759
positions = self._data.add_raw_records(raw_record_sizes, data)
761
for (version_id, options, parents, _), access_memo in zip(
763
index_entries.append((version_id, options, access_memo, parents))
764
self._index.add_versions(index_entries)
766
def copy_to(self, name, transport):
767
"""See VersionedFile.copy_to()."""
768
# copy the current index to a temp index to avoid racing with local
770
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
771
self.transport.get(self._index._filename))
773
f = self._data._open_file()
775
transport.put_file(name + DATA_SUFFIX, f)
778
# move the copied index into place
779
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
781
def get_data_stream(self, required_versions):
782
"""Get a data stream for the specified versions.
784
Versions may be returned in any order, not necessarily the order
785
specified. They are returned in a partial order by compression
786
parent, so that the deltas can be applied as the data stream is
787
inserted; however note that compression parents will not be sent
788
unless they were specifically requested, as the client may already
791
:param required_versions: The exact set of versions to be extracted.
792
Unlike some other knit methods, this is not used to generate a
793
transitive closure, rather it is used precisely as given.
795
:returns: format_signature, list of (version, options, length, parents),
798
required_version_set = frozenset(required_versions)
800
# list of revisions that can just be sent without waiting for their
803
# map from revision to the children based on it
805
# first, read all relevant index data, enough to sort into the right
807
for version_id in required_versions:
808
options = self._index.get_options(version_id)
809
parents = self._index.get_parents_with_ghosts(version_id)
810
index_memo = self._index.get_position(version_id)
811
version_index[version_id] = (index_memo, options, parents)
812
if ('line-delta' in options
813
and parents[0] in required_version_set):
814
# must wait until the parent has been sent
815
deferred.setdefault(parents[0], []). \
818
# either a fulltext, or a delta whose parent the client did
819
# not ask for and presumably already has
820
ready_to_send.append(version_id)
821
# build a list of results to return, plus instructions for data to
823
copy_queue_records = []
824
temp_version_list = []
826
# XXX: pushing and popping lists may be a bit inefficient
827
version_id = ready_to_send.pop(0)
828
(index_memo, options, parents) = version_index[version_id]
829
copy_queue_records.append((version_id, index_memo))
830
none, data_pos, data_size = index_memo
831
temp_version_list.append((version_id, options, data_size,
833
if version_id in deferred:
834
# now we can send all the children of this revision - we could
835
# put them in anywhere, but we hope that sending them soon
836
# after the fulltext will give good locality in the receiver
837
ready_to_send[:0] = deferred.pop(version_id)
838
if not (len(deferred) == 0):
839
raise AssertionError("Still have compressed child versions waiting to be sent")
840
# XXX: The stream format is such that we cannot stream it - we have to
841
# know the length of all the data a-priori.
843
result_version_list = []
844
for (version_id, raw_data, _), \
845
(version_id2, options, _, parents) in \
846
izip(self._data.read_records_iter_raw(copy_queue_records),
848
if not (version_id == version_id2):
849
raise AssertionError('logic error, inconsistent results')
850
raw_datum.append(raw_data)
851
result_version_list.append(
852
(version_id, options, len(raw_data), parents))
853
# provide a callback to get data incrementally.
854
pseudo_file = StringIO(''.join(raw_datum))
857
return pseudo_file.read()
859
return pseudo_file.read(length)
860
return (self.get_format_signature(), result_version_list, read)
862
def get_record_stream(self, versions, ordering, include_delta_closure):
863
"""Get a stream of records for versions.
865
:param versions: The versions to include. Each version is a tuple
867
:param ordering: Either 'unordered' or 'topological'. A topologically
868
sorted stream has compression parents strictly before their
870
:param include_delta_closure: If True then the closure across any
871
compression parents will be included (in the opaque data).
872
:return: An iterator of ContentFactory objects, each of which is only
873
valid until the iterator is advanced.
875
if include_delta_closure:
876
# Nb: what we should do is plan the data to stream to allow
877
# reconstruction of all the texts without excessive buffering,
878
# including re-sending common bases as needed. This makes the most
879
# sense when we start serialising these streams though, so for now
880
# we just fallback to individual text construction behind the
881
# abstraction barrier.
885
# We end up doing multiple index lookups here for parents details and
886
# disk layout details - we need a unified api ?
887
parent_map = self.get_parent_map(versions)
888
absent_versions = set(versions) - set(parent_map)
889
if ordering == 'topological':
890
present_versions = topo_sort(parent_map)
892
# List comprehension to keep the requested order (as that seems
893
# marginally useful, at least until we start doing IO optimising
895
present_versions = [version for version in versions if version in
897
position_map = self._get_components_positions(present_versions)
898
records = [(version, position_map[version][1]) for version in
901
for version in absent_versions:
902
yield AbsentContentFactory((version,))
903
for version, raw_data, sha1 in \
904
self._data.read_records_iter_raw(records):
905
(record_details, index_memo, _) = position_map[version]
906
yield KnitContentFactory(version, parent_map[version],
907
record_details, sha1, raw_data, self.factory.annotated, knit)
909
def _extract_blocks(self, version_id, source, target):
910
if self._index.get_method(version_id) != 'line-delta':
912
parent, sha1, noeol, delta = self.get_delta(version_id)
913
return KnitContent.get_line_delta_blocks(delta, source, target)
915
def get_delta(self, version_id):
916
"""Get a delta for constructing version from some other version."""
917
self.check_not_reserved_id(version_id)
918
parents = self.get_parent_map([version_id])[version_id]
923
index_memo = self._index.get_position(version_id)
924
data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
925
noeol = 'no-eol' in self._index.get_options(version_id)
926
if 'fulltext' == self._index.get_method(version_id):
927
new_content = self.factory.parse_fulltext(data, version_id)
928
if parent is not None:
929
reference_content = self._get_content(parent)
930
old_texts = reference_content.text()
933
new_texts = new_content.text()
934
delta_seq = patiencediff.PatienceSequenceMatcher(None, old_texts,
936
return parent, sha1, noeol, self._make_line_delta(delta_seq, new_content)
938
delta = self.factory.parse_line_delta(data, version_id)
939
return parent, sha1, noeol, delta
941
def get_format_signature(self):
942
"""See VersionedFile.get_format_signature()."""
943
if self.factory.annotated:
944
annotated_part = "annotated"
946
annotated_part = "plain"
947
return "knit-%s" % (annotated_part,)
949
@deprecated_method(one_four)
950
def get_graph_with_ghosts(self):
951
"""See VersionedFile.get_graph_with_ghosts()."""
952
return self.get_parent_map(self.versions())
954
def get_sha1s(self, version_ids):
955
"""See VersionedFile.get_sha1s()."""
956
record_map = self._get_record_map(version_ids)
957
# record entry 2 is the 'digest'.
958
return [record_map[v][2] for v in version_ids]
960
@deprecated_method(one_four)
961
def has_ghost(self, version_id):
962
"""True if there is a ghost reference in the file to version_id."""
964
if self.has_version(version_id):
966
# optimisable if needed by memoising the _ghosts set.
967
items = self.get_parent_map(self.versions())
968
for parents in items.itervalues():
969
for parent in parents:
970
if parent == version_id and parent not in items:
974
def insert_data_stream(self, (format, data_list, reader_callable)):
975
"""Insert knit records from a data stream into this knit.
977
If a version in the stream is already present in this knit, it will not
978
be inserted a second time. It will be checked for consistency with the
979
stored version however, and may cause a KnitCorrupt error to be raised
980
if the data in the stream disagrees with the already stored data.
982
:seealso: get_data_stream
984
if format != self.get_format_signature():
985
if 'knit' in debug.debug_flags:
987
'incompatible format signature inserting to %r', self)
988
source = self._knit_from_datastream(
989
(format, data_list, reader_callable))
990
stream = source.get_record_stream(source.versions(), 'unordered', False)
991
self.insert_record_stream(stream)
994
for version_id, options, length, parents in data_list:
995
if self.has_version(version_id):
996
# First check: the list of parents.
997
my_parents = self.get_parents_with_ghosts(version_id)
998
if tuple(my_parents) != tuple(parents):
999
# XXX: KnitCorrupt is not quite the right exception here.
1002
'parents list %r from data stream does not match '
1003
'already recorded parents %r for %s'
1004
% (parents, my_parents, version_id))
1006
# Also check the SHA-1 of the fulltext this content will
1008
raw_data = reader_callable(length)
1009
my_fulltext_sha1 = self.get_sha1s([version_id])[0]
1010
df, rec = self._data._parse_record_header(version_id, raw_data)
1011
stream_fulltext_sha1 = rec[3]
1012
if my_fulltext_sha1 != stream_fulltext_sha1:
1013
# Actually, we don't know if it's this knit that's corrupt,
1014
# or the data stream we're trying to insert.
1016
self.filename, 'sha-1 does not match %s' % version_id)
1018
if 'line-delta' in options:
1019
# Make sure that this knit record is actually useful: a
1020
# line-delta is no use unless we have its parent.
1021
# Fetching from a broken repository with this problem
1022
# shouldn't break the target repository.
1024
# See https://bugs.launchpad.net/bzr/+bug/164443
1025
if not self._index.has_version(parents[0]):
1028
'line-delta from stream '
1031
'missing parent %s\n'
1032
'Try running "bzr check" '
1033
'on the source repository, and "bzr reconcile" '
1035
(version_id, parents[0]))
1037
# We received a line-delta record for a non-delta knit.
1038
# Convert it to a fulltext.
1039
gzip_bytes = reader_callable(length)
1040
self._convert_line_delta_to_fulltext(
1041
gzip_bytes, version_id, parents)
1044
self._add_raw_records(
1045
[(version_id, options, parents, length)],
1046
reader_callable(length))
1048
def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
1049
lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
1050
delta = self.factory.parse_line_delta(lines, version_id)
1051
content = self.factory.make(self.get_lines(parents[0]), parents[0])
1052
content.apply_delta(delta, version_id)
1053
digest, len, content = self.add_lines(
1054
version_id, parents, content.text())
1056
raise errors.VersionedFileInvalidChecksum(version_id)
1058
def _knit_from_datastream(self, (format, data_list, reader_callable)):
1059
"""Create a knit object from a data stream.
1061
This method exists to allow conversion of data streams that do not
1062
match the signature of this knit. Generally it will be slower and use
1063
more memory to use this method to insert data, but it will work.
1065
:seealso: get_data_stream for details on datastreams.
1066
:return: A knit versioned file which can be used to join the datastream
1069
if format == "knit-plain":
1070
factory = KnitPlainFactory()
1071
elif format == "knit-annotated":
1072
factory = KnitAnnotateFactory()
1074
raise errors.KnitDataStreamUnknown(format)
1075
index = _StreamIndex(data_list, self._index)
1076
access = _StreamAccess(reader_callable, index, self, factory)
1077
return KnitVersionedFile(self.filename, self.transport,
1078
factory=factory, index=index, access_method=access)
1080
def insert_record_stream(self, stream):
1081
"""Insert a record stream into this versioned file.
1083
:param stream: A stream of records to insert.
1085
:seealso VersionedFile.get_record_stream:
1087
def get_adapter(adapter_key):
1089
return adapters[adapter_key]
1091
adapter_factory = adapter_registry.get(adapter_key)
1092
adapter = adapter_factory(self)
1093
adapters[adapter_key] = adapter
1095
if self.factory.annotated:
1096
# self is annotated, we need annotated knits to use directly.
1097
annotated = "annotated-"
1100
# self is not annotated, but we can strip annotations cheaply.
1102
convertibles = set(["knit-annotated-delta-gz",
1103
"knit-annotated-ft-gz"])
1104
# The set of types we can cheaply adapt without needing basis texts.
1105
native_types = set()
1106
native_types.add("knit-%sdelta-gz" % annotated)
1107
native_types.add("knit-%sft-gz" % annotated)
1108
knit_types = native_types.union(convertibles)
1110
# Buffer all index entries that we can't add immediately because their
1111
# basis parent is missing. We don't buffer all because generating
1112
# annotations may require access to some of the new records. However we
1113
# can't generate annotations from new deltas until their basis parent
1114
# is present anyway, so we get away with not needing an index that
1115
# includes the new keys.
1116
# key = basis_parent, value = index entry to add
1117
buffered_index_entries = {}
1118
for record in stream:
1119
# Raise an error when a record is missing.
1120
if record.storage_kind == 'absent':
1121
raise RevisionNotPresent([record.key[0]], self)
1122
# adapt to non-tuple interface
1123
parents = [parent[0] for parent in record.parents]
1124
if record.storage_kind in knit_types:
1125
if record.storage_kind not in native_types:
1127
adapter_key = (record.storage_kind, "knit-delta-gz")
1128
adapter = get_adapter(adapter_key)
1130
adapter_key = (record.storage_kind, "knit-ft-gz")
1131
adapter = get_adapter(adapter_key)
1132
bytes = adapter.get_bytes(
1133
record, record.get_bytes_as(record.storage_kind))
1135
bytes = record.get_bytes_as(record.storage_kind)
1136
options = [record._build_details[0]]
1137
if record._build_details[1]:
1138
options.append('no-eol')
1139
# Just blat it across.
1140
# Note: This does end up adding data on duplicate keys. As
1141
# modern repositories use atomic insertions this should not
1142
# lead to excessive growth in the event of interrupted fetches.
1143
# 'knit' repositories may suffer excessive growth, but as a
1144
# deprecated format this is tolerable. It can be fixed if
1145
# needed by in the kndx index support raising on a duplicate
1146
# add with identical parents and options.
1147
access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
1148
index_entry = (record.key[0], options, access_memo, parents)
1150
if 'fulltext' not in options:
1151
basis_parent = parents[0]
1152
if not self.has_version(basis_parent):
1153
pending = buffered_index_entries.setdefault(
1155
pending.append(index_entry)
1158
self._index.add_versions([index_entry])
1159
elif record.storage_kind == 'fulltext':
1160
self.add_lines(record.key[0], parents,
1161
split_lines(record.get_bytes_as('fulltext')))
1163
adapter_key = record.storage_kind, 'fulltext'
1164
adapter = get_adapter(adapter_key)
1165
lines = split_lines(adapter.get_bytes(
1166
record, record.get_bytes_as(record.storage_kind)))
1168
self.add_lines(record.key[0], parents, lines)
1169
except errors.RevisionAlreadyPresent:
1171
# Add any records whose basis parent is now available.
1172
added_keys = [record.key[0]]
1174
key = added_keys.pop(0)
1175
if key in buffered_index_entries:
1176
index_entries = buffered_index_entries[key]
1177
self._index.add_versions(index_entries)
1179
[index_entry[0] for index_entry in index_entries])
1180
del buffered_index_entries[key]
1181
# If there were any deltas which had a missing basis parent, error.
1182
if buffered_index_entries:
1183
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1187
"""See VersionedFile.versions."""
1188
if 'evil' in debug.debug_flags:
1189
trace.mutter_callsite(2, "versions scales with size of history")
1190
return self._index.get_versions()
1192
def has_version(self, version_id):
1193
"""See VersionedFile.has_version."""
1194
if 'evil' in debug.debug_flags:
1195
trace.mutter_callsite(2, "has_version is a LBYL scenario")
1196
return self._index.has_version(version_id)
1198
__contains__ = has_version
1200
def _merge_annotations(self, content, parents, parent_texts={},
1201
delta=None, annotated=None,
1202
left_matching_blocks=None):
1203
"""Merge annotations for content. This is done by comparing
1204
the annotations based on changed to the text.
1206
if left_matching_blocks is not None:
1207
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1211
for parent_id in parents:
1212
merge_content = self._get_content(parent_id, parent_texts)
1213
if (parent_id == parents[0] and delta_seq is not None):
1216
seq = patiencediff.PatienceSequenceMatcher(
1217
None, merge_content.text(), content.text())
1218
for i, j, n in seq.get_matching_blocks():
1221
# this appears to copy (origin, text) pairs across to the
1222
# new content for any line that matches the last-checked
1224
content._lines[j:j+n] = merge_content._lines[i:i+n]
1226
if delta_seq is None:
1227
reference_content = self._get_content(parents[0], parent_texts)
1228
new_texts = content.text()
1229
old_texts = reference_content.text()
1230
delta_seq = patiencediff.PatienceSequenceMatcher(
1231
None, old_texts, new_texts)
1232
return self._make_line_delta(delta_seq, content)
1234
def _make_line_delta(self, delta_seq, new_content):
1235
"""Generate a line delta from delta_seq and new_content."""
1237
for op in delta_seq.get_opcodes():
1238
if op[0] == 'equal':
1240
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1243
def _get_components_positions(self, version_ids):
1244
"""Produce a map of position data for the components of versions.
1246
This data is intended to be used for retrieving the knit records.
1248
A dict of version_id to (record_details, index_memo, next, parents) is
1250
method is the way referenced data should be applied.
1251
index_memo is the handle to pass to the data access to actually get the
1253
next is the build-parent of the version, or None for fulltexts.
1254
parents is the version_ids of the parents of this version
1257
pending_components = version_ids
1258
while pending_components:
1259
build_details = self._index.get_build_details(pending_components)
1260
current_components = set(pending_components)
1261
pending_components = set()
1262
for version_id, details in build_details.iteritems():
1263
(index_memo, compression_parent, parents,
1264
record_details) = details
1265
method = record_details[0]
1266
if compression_parent is not None:
1267
pending_components.add(compression_parent)
1268
component_data[version_id] = (record_details, index_memo,
1270
missing = current_components.difference(build_details)
1272
raise errors.RevisionNotPresent(missing.pop(), self.filename)
1273
return component_data
1275
def _get_content(self, version_id, parent_texts={}):
1276
"""Returns a content object that makes up the specified
1278
cached_version = parent_texts.get(version_id, None)
1279
if cached_version is not None:
1280
if not self.has_version(version_id):
1281
raise RevisionNotPresent(version_id, self.filename)
1282
return cached_version
1284
text_map, contents_map = self._get_content_maps([version_id])
1285
return contents_map[version_id]
1287
def _check_versions_present(self, version_ids):
1288
"""Check that all specified versions are present."""
1289
self._index.check_versions_present(version_ids)
1291
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts,
1292
nostore_sha, random_id, check_content, left_matching_blocks):
1293
"""See VersionedFile.add_lines_with_ghosts()."""
1294
self._check_add(version_id, lines, random_id, check_content)
1295
return self._add(version_id, lines, parents, self.delta,
1296
parent_texts, left_matching_blocks, nostore_sha, random_id)
1298
def _add_lines(self, version_id, parents, lines, parent_texts,
1299
left_matching_blocks, nostore_sha, random_id, check_content):
1300
"""See VersionedFile.add_lines."""
1301
self._check_add(version_id, lines, random_id, check_content)
1302
self._check_versions_present(parents)
1303
return self._add(version_id, lines[:], parents, self.delta,
1304
parent_texts, left_matching_blocks, nostore_sha, random_id)
1306
def _check_add(self, version_id, lines, random_id, check_content):
1307
"""check that version_id and lines are safe to add."""
1308
if contains_whitespace(version_id):
1309
raise InvalidRevisionId(version_id, self.filename)
1310
self.check_not_reserved_id(version_id)
1311
# Technically this could be avoided if we are happy to allow duplicate
1312
# id insertion when other things than bzr core insert texts, but it
1313
# seems useful for folk using the knit api directly to have some safety
1314
# blanket that we can disable.
1315
if not random_id and self.has_version(version_id):
1316
raise RevisionAlreadyPresent(version_id, self.filename)
1318
self._check_lines_not_unicode(lines)
1319
self._check_lines_are_lines(lines)
1321
def _add(self, version_id, lines, parents, delta, parent_texts,
1322
left_matching_blocks, nostore_sha, random_id):
1323
"""Add a set of lines on top of version specified by parents.
1325
If delta is true, compress the text as a line-delta against
1328
Any versions not present will be converted into ghosts.
1330
# first thing, if the content is something we don't need to store, find
1332
line_bytes = ''.join(lines)
1333
digest = sha_string(line_bytes)
1334
if nostore_sha == digest:
1335
raise errors.ExistingContent
1337
present_parents = []
1338
if parent_texts is None:
1340
for parent in parents:
1341
if self.has_version(parent):
1342
present_parents.append(parent)
1344
# can only compress against the left most present parent.
1346
(len(present_parents) == 0 or
1347
present_parents[0] != parents[0])):
1350
text_length = len(line_bytes)
1353
if lines[-1][-1] != '\n':
1354
# copy the contents of lines.
1356
options.append('no-eol')
1357
lines[-1] = lines[-1] + '\n'
1361
# To speed the extract of texts the delta chain is limited
1362
# to a fixed number of deltas. This should minimize both
1363
# I/O and the time spend applying deltas.
1364
delta = self._check_should_delta(present_parents)
1366
content = self.factory.make(lines, version_id)
1367
if delta or (self.factory.annotated and len(present_parents) > 0):
1368
# Merge annotations from parent texts if needed.
1369
delta_hunks = self._merge_annotations(content, present_parents,
1370
parent_texts, delta, self.factory.annotated,
1371
left_matching_blocks)
1374
options.append('line-delta')
1375
store_lines = self.factory.lower_line_delta(delta_hunks)
1376
size, bytes = self._data._record_to_data(version_id, digest,
1379
options.append('fulltext')
1380
# isinstance is slower and we have no hierarchy.
1381
if self.factory.__class__ == KnitPlainFactory:
1382
# Use the already joined bytes saving iteration time in
1384
size, bytes = self._data._record_to_data(version_id, digest,
1385
lines, [line_bytes])
1387
# get mixed annotation + content and feed it into the
1389
store_lines = self.factory.lower_fulltext(content)
1390
size, bytes = self._data._record_to_data(version_id, digest,
1393
access_memo = self._data.add_raw_records([size], bytes)[0]
1394
self._index.add_versions(
1395
((version_id, options, access_memo, parents),),
1396
random_id=random_id)
1397
return digest, text_length, content
1399
def check(self, progress_bar=None):
1400
"""See VersionedFile.check()."""
1401
# This doesn't actually test extraction of everything, but that will
1402
# impact 'bzr check' substantially, and needs to be integrated with
1403
# care. However, it does check for the obvious problem of a delta with
1405
versions = self.versions()
1406
parent_map = self.get_parent_map(versions)
1407
for version in versions:
1408
if self._index.get_method(version) != 'fulltext':
1409
compression_parent = parent_map[version][0]
1410
if compression_parent not in parent_map:
1411
raise errors.KnitCorrupt(self,
1412
"Missing basis parent %s for %s" % (
1413
compression_parent, version))
1415
def get_lines(self, version_id):
1416
"""See VersionedFile.get_lines()."""
1417
return self.get_line_list([version_id])[0]
1419
def _get_record_map(self, version_ids):
1420
"""Produce a dictionary of knit records.
1422
:return: {version_id:(record, record_details, digest, next)}
1424
data returned from read_records
1426
opaque information to pass to parse_record
1428
SHA1 digest of the full text after all steps are done
1430
build-parent of the version, i.e. the leftmost ancestor.
1431
Will be None if the record is not a delta.
1433
position_map = self._get_components_positions(version_ids)
1434
# c = component_id, r = record_details, i_m = index_memo, n = next
1435
records = [(c, i_m) for c, (r, i_m, n)
1436
in position_map.iteritems()]
1438
for component_id, record, digest in \
1439
self._data.read_records_iter(records):
1440
(record_details, index_memo, next) = position_map[component_id]
1441
record_map[component_id] = record, record_details, digest, next
1445
def get_text(self, version_id):
1446
"""See VersionedFile.get_text"""
1447
return self.get_texts([version_id])[0]
1449
def get_texts(self, version_ids):
1450
return [''.join(l) for l in self.get_line_list(version_ids)]
1452
def get_line_list(self, version_ids):
1453
"""Return the texts of listed versions as a list of strings."""
1454
for version_id in version_ids:
1455
self.check_not_reserved_id(version_id)
1456
text_map, content_map = self._get_content_maps(version_ids)
1457
return [text_map[v] for v in version_ids]
1459
_get_lf_split_line_list = get_line_list
1461
def _get_content_maps(self, version_ids):
1462
"""Produce maps of text and KnitContents
1464
:return: (text_map, content_map) where text_map contains the texts for
1465
the requested versions and content_map contains the KnitContents.
1466
Both dicts take version_ids as their keys.
1468
# FUTURE: This function could be improved for the 'extract many' case
1469
# by tracking each component and only doing the copy when the number of
1470
# children than need to apply delta's to it is > 1 or it is part of the
1472
version_ids = list(version_ids)
1473
multiple_versions = len(version_ids) != 1
1474
record_map = self._get_record_map(version_ids)
1479
for version_id in version_ids:
1482
while cursor is not None:
1483
record, record_details, digest, next = record_map[cursor]
1484
components.append((cursor, record, record_details, digest))
1485
if cursor in content_map:
1490
for (component_id, record, record_details,
1491
digest) in reversed(components):
1492
if component_id in content_map:
1493
content = content_map[component_id]
1495
content, delta = self.factory.parse_record(version_id,
1496
record, record_details, content,
1497
copy_base_content=multiple_versions)
1498
if multiple_versions:
1499
content_map[component_id] = content
1501
content.cleanup_eol(copy_on_mutate=multiple_versions)
1502
final_content[version_id] = content
1504
# digest here is the digest from the last applied component.
1505
text = content.text()
1506
actual_sha = sha_strings(text)
1507
if actual_sha != digest:
1508
raise KnitCorrupt(self.filename,
1510
'\n of reconstructed text does not match'
1512
'\n for version %s' %
1513
(actual_sha, digest, version_id))
1514
text_map[version_id] = text
1515
return text_map, final_content
1517
def iter_lines_added_or_present_in_versions(self, version_ids=None,
1519
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
1520
if version_ids is None:
1521
version_ids = self.versions()
1523
pb = progress.DummyProgress()
1524
# we don't care about inclusions, the caller cares.
1525
# but we need to setup a list of records to visit.
1526
# we need version_id, position, length
1527
version_id_records = []
1528
requested_versions = set(version_ids)
1529
# filter for available versions
1530
for version_id in requested_versions:
1531
if not self.has_version(version_id):
1532
raise RevisionNotPresent(version_id, self.filename)
1533
# get a in-component-order queue:
1534
for version_id in self.versions():
1535
if version_id in requested_versions:
1536
index_memo = self._index.get_position(version_id)
1537
version_id_records.append((version_id, index_memo))
1539
total = len(version_id_records)
1540
for version_idx, (version_id, data, sha_value) in \
1541
enumerate(self._data.read_records_iter(version_id_records)):
1542
pb.update('Walking content.', version_idx, total)
1543
method = self._index.get_method(version_id)
1544
if method == 'fulltext':
1545
line_iterator = self.factory.get_fulltext_content(data)
1546
elif method == 'line-delta':
1547
line_iterator = self.factory.get_linedelta_content(data)
1549
raise ValueError('invalid method %r' % (method,))
1550
# XXX: It might be more efficient to yield (version_id,
1551
# line_iterator) in the future. However for now, this is a simpler
1552
# change to integrate into the rest of the codebase. RBC 20071110
1553
for line in line_iterator:
1554
yield line, version_id
1556
pb.update('Walking content.', total, total)
1558
def num_versions(self):
1559
"""See VersionedFile.num_versions()."""
1560
return self._index.num_versions()
1562
__len__ = num_versions
1564
def annotate(self, version_id):
1565
"""See VersionedFile.annotate."""
1566
return self.factory.annotate(self, version_id)
1568
def get_parent_map(self, version_ids):
1569
"""See VersionedFile.get_parent_map."""
1570
return self._index.get_parent_map(version_ids)
1572
def get_ancestry(self, versions, topo_sorted=True):
1573
"""See VersionedFile.get_ancestry."""
1574
if isinstance(versions, basestring):
1575
versions = [versions]
1578
return self._index.get_ancestry(versions, topo_sorted)
1580
def get_ancestry_with_ghosts(self, versions):
1581
"""See VersionedFile.get_ancestry_with_ghosts."""
1582
if isinstance(versions, basestring):
1583
versions = [versions]
1586
return self._index.get_ancestry_with_ghosts(versions)
1588
def plan_merge(self, ver_a, ver_b):
1589
"""See VersionedFile.plan_merge."""
1590
ancestors_b = set(self.get_ancestry(ver_b, topo_sorted=False))
1591
ancestors_a = set(self.get_ancestry(ver_a, topo_sorted=False))
1592
annotated_a = self.annotate(ver_a)
1593
annotated_b = self.annotate(ver_b)
1594
return merge._plan_annotate_merge(annotated_a, annotated_b,
1595
ancestors_a, ancestors_b)
1598
class _KnitComponentFile(object):
1599
"""One of the files used to implement a knit database"""
1601
def __init__(self, transport, filename, mode, file_mode=None,
1602
create_parent_dir=False, dir_mode=None):
1603
self._transport = transport
1604
self._filename = filename
1606
self._file_mode = file_mode
1607
self._dir_mode = dir_mode
1608
self._create_parent_dir = create_parent_dir
1609
self._need_to_create = False
1611
def _full_path(self):
1612
"""Return the full path to this file."""
1613
return self._transport.base + self._filename
1615
def check_header(self, fp):
1616
line = fp.readline()
1618
# An empty file can actually be treated as though the file doesn't
1620
raise errors.NoSuchFile(self._full_path())
1621
if line != self.HEADER:
1622
raise KnitHeaderError(badline=line,
1623
filename=self._transport.abspath(self._filename))
1626
return '%s(%s)' % (self.__class__.__name__, self._filename)
1629
class _KnitIndex(_KnitComponentFile):
1630
"""Manages knit index file.
1632
The index is already kept in memory and read on startup, to enable
1633
fast lookups of revision information. The cursor of the index
1634
file is always pointing to the end, making it easy to append
1637
_cache is a cache for fast mapping from version id to a Index
1640
_history is a cache for fast mapping from indexes to version ids.
1642
The index data format is dictionary compressed when it comes to
1643
parent references; a index entry may only have parents that with a
1644
lover index number. As a result, the index is topological sorted.
1646
Duplicate entries may be written to the index for a single version id
1647
if this is done then the latter one completely replaces the former:
1648
this allows updates to correct version and parent information.
1649
Note that the two entries may share the delta, and that successive
1650
annotations and references MUST point to the first entry.
1652
The index file on disc contains a header, followed by one line per knit
1653
record. The same revision can be present in an index file more than once.
1654
The first occurrence gets assigned a sequence number starting from 0.
1656
The format of a single line is
1657
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1658
REVISION_ID is a utf8-encoded revision id
1659
FLAGS is a comma separated list of flags about the record. Values include
1660
no-eol, line-delta, fulltext.
1661
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1662
that the the compressed data starts at.
1663
LENGTH is the ascii representation of the length of the data file.
1664
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1666
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1667
revision id already in the knit that is a parent of REVISION_ID.
1668
The ' :' marker is the end of record marker.
1671
when a write is interrupted to the index file, it will result in a line
1672
that does not end in ' :'. If the ' :' is not present at the end of a line,
1673
or at the end of the file, then the record that is missing it will be
1674
ignored by the parser.
1676
When writing new records to the index file, the data is preceded by '\n'
1677
to ensure that records always start on new lines even if the last write was
1678
interrupted. As a result its normal for the last line in the index to be
1679
missing a trailing newline. One can be added with no harmful effects.
1682
HEADER = "# bzr knit index 8\n"
1684
# speed of knit parsing went from 280 ms to 280 ms with slots addition.
1685
# __slots__ = ['_cache', '_history', '_transport', '_filename']
1687
def _cache_version(self, version_id, options, pos, size, parents):
1688
"""Cache a version record in the history array and index cache.
1690
This is inlined into _load_data for performance. KEEP IN SYNC.
1691
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1694
# only want the _history index to reference the 1st index entry
1696
if version_id not in self._cache:
1697
index = len(self._history)
1698
self._history.append(version_id)
1700
index = self._cache[version_id][5]
1701
self._cache[version_id] = (version_id,
1708
def _check_write_ok(self):
1709
if self._get_scope() != self._scope:
1710
raise errors.OutSideTransaction()
1711
if self._mode != 'w':
1712
raise errors.ReadOnlyObjectDirtiedError(self)
1714
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1715
create_parent_dir=False, delay_create=False, dir_mode=None,
1717
_KnitComponentFile.__init__(self, transport, filename, mode,
1718
file_mode=file_mode,
1719
create_parent_dir=create_parent_dir,
1722
# position in _history is the 'official' index for a revision
1723
# but the values may have come from a newer entry.
1724
# so - wc -l of a knit index is != the number of unique names
1728
fp = self._transport.get(self._filename)
1730
# _load_data may raise NoSuchFile if the target knit is
1732
_load_data(self, fp)
1736
if mode != 'w' or not create:
1739
self._need_to_create = True
1741
self._transport.put_bytes_non_atomic(
1742
self._filename, self.HEADER, mode=self._file_mode)
1743
self._scope = get_scope()
1744
self._get_scope = get_scope
1746
def get_ancestry(self, versions, topo_sorted=True):
1747
"""See VersionedFile.get_ancestry."""
1748
# get a graph of all the mentioned versions:
1750
pending = set(versions)
1753
version = pending.pop()
1756
parents = [p for p in cache[version][4] if p in cache]
1758
raise RevisionNotPresent(version, self._filename)
1759
# if not completed and not a ghost
1760
pending.update([p for p in parents if p not in graph])
1761
graph[version] = parents
1764
return topo_sort(graph.items())
1766
def get_ancestry_with_ghosts(self, versions):
1767
"""See VersionedFile.get_ancestry_with_ghosts."""
1768
# get a graph of all the mentioned versions:
1769
self.check_versions_present(versions)
1772
pending = set(versions)
1774
version = pending.pop()
1776
parents = cache[version][4]
1782
pending.update([p for p in parents if p not in graph])
1783
graph[version] = parents
1784
return topo_sort(graph.items())
1786
def get_build_details(self, version_ids):
1787
"""Get the method, index_memo and compression parent for version_ids.
1789
Ghosts are omitted from the result.
1791
:param version_ids: An iterable of version_ids.
1792
:return: A dict of version_id:(index_memo, compression_parent,
1793
parents, record_details).
1795
opaque structure to pass to read_records to extract the raw
1798
Content that this record is built upon, may be None
1800
Logical parents of this node
1802
extra information about the content which needs to be passed to
1803
Factory.parse_record
1806
for version_id in version_ids:
1807
if version_id not in self._cache:
1808
# ghosts are omitted
1810
method = self.get_method(version_id)
1811
parents = self.get_parents_with_ghosts(version_id)
1812
if method == 'fulltext':
1813
compression_parent = None
1815
compression_parent = parents[0]
1816
noeol = 'no-eol' in self.get_options(version_id)
1817
index_memo = self.get_position(version_id)
1818
result[version_id] = (index_memo, compression_parent,
1819
parents, (method, noeol))
1822
def num_versions(self):
1823
return len(self._history)
1825
__len__ = num_versions
1827
def get_versions(self):
1828
"""Get all the versions in the file. not topologically sorted."""
1829
return self._history
1831
def _version_list_to_index(self, versions):
1834
for version in versions:
1835
if version in cache:
1836
# -- inlined lookup() --
1837
result_list.append(str(cache[version][5]))
1838
# -- end lookup () --
1840
result_list.append('.' + version)
1841
return ' '.join(result_list)
1843
def add_version(self, version_id, options, index_memo, parents):
1844
"""Add a version record to the index."""
1845
self.add_versions(((version_id, options, index_memo, parents),))
1847
def add_versions(self, versions, random_id=False):
1848
"""Add multiple versions to the index.
1850
:param versions: a list of tuples:
1851
(version_id, options, pos, size, parents).
1852
:param random_id: If True the ids being added were randomly generated
1853
and no check for existence will be performed.
1856
orig_history = self._history[:]
1857
orig_cache = self._cache.copy()
1860
for version_id, options, (index, pos, size), parents in versions:
1861
line = "\n%s %s %s %s %s :" % (version_id,
1865
self._version_list_to_index(parents))
1867
self._cache_version(version_id, options, pos, size, tuple(parents))
1868
if not self._need_to_create:
1869
self._transport.append_bytes(self._filename, ''.join(lines))
1872
sio.write(self.HEADER)
1873
sio.writelines(lines)
1875
self._transport.put_file_non_atomic(self._filename, sio,
1876
create_parent_dir=self._create_parent_dir,
1877
mode=self._file_mode,
1878
dir_mode=self._dir_mode)
1879
self._need_to_create = False
1881
# If any problems happen, restore the original values and re-raise
1882
self._history = orig_history
1883
self._cache = orig_cache
1886
def has_version(self, version_id):
1887
"""True if the version is in the index."""
1888
return version_id in self._cache
1890
def get_position(self, version_id):
1891
"""Return details needed to access the version.
1893
.kndx indices do not support split-out data, so return None for the
1896
:return: a tuple (None, data position, size) to hand to the access
1897
logic to get the record.
1899
entry = self._cache[version_id]
1900
return None, entry[2], entry[3]
1902
def get_method(self, version_id):
1903
"""Return compression method of specified version."""
1905
options = self._cache[version_id][1]
1907
raise RevisionNotPresent(version_id, self._filename)
1908
if 'fulltext' in options:
1911
if 'line-delta' not in options:
1912
raise errors.KnitIndexUnknownMethod(self._full_path(), options)
1915
def get_options(self, version_id):
1916
"""Return a list representing options.
1920
return self._cache[version_id][1]
1922
def get_parent_map(self, version_ids):
1923
"""Passed through to by KnitVersionedFile.get_parent_map."""
1925
for version_id in version_ids:
1927
result[version_id] = tuple(self._cache[version_id][4])
1932
def get_parents_with_ghosts(self, version_id):
1933
"""Return parents of specified version with ghosts."""
1935
return self.get_parent_map([version_id])[version_id]
1937
raise RevisionNotPresent(version_id, self)
1939
def check_versions_present(self, version_ids):
1940
"""Check that all specified versions are present."""
1942
for version_id in version_ids:
1943
if version_id not in cache:
1944
raise RevisionNotPresent(version_id, self._filename)
1947
class KnitGraphIndex(object):
1948
"""A knit index that builds on GraphIndex."""
1950
def __init__(self, graph_index, deltas=False, parents=True, add_callback=None):
1951
"""Construct a KnitGraphIndex on a graph_index.
1953
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1954
:param deltas: Allow delta-compressed records.
1955
:param add_callback: If not None, allow additions to the index and call
1956
this callback with a list of added GraphIndex nodes:
1957
[(node, value, node_refs), ...]
1958
:param parents: If True, record knits parents, if not do not record
1961
self._graph_index = graph_index
1962
self._deltas = deltas
1963
self._add_callback = add_callback
1964
self._parents = parents
1965
if deltas and not parents:
1966
raise KnitCorrupt(self, "Cannot do delta compression without "
1969
def _check_write_ok(self):
1972
def _get_entries(self, keys, check_present=False):
1973
"""Get the entries for keys.
1975
:param keys: An iterable of index keys, - 1-tuples.
1980
for node in self._graph_index.iter_entries(keys):
1982
found_keys.add(node[1])
1984
# adapt parentless index to the rest of the code.
1985
for node in self._graph_index.iter_entries(keys):
1986
yield node[0], node[1], node[2], ()
1987
found_keys.add(node[1])
1989
missing_keys = keys.difference(found_keys)
1991
raise RevisionNotPresent(missing_keys.pop(), self)
1993
def _present_keys(self, version_ids):
1995
node[1] for node in self._get_entries(version_ids)])
1997
def _parentless_ancestry(self, versions):
1998
"""Honour the get_ancestry API for parentless knit indices."""
1999
wanted_keys = self._version_ids_to_keys(versions)
2000
present_keys = self._present_keys(wanted_keys)
2001
missing = set(wanted_keys).difference(present_keys)
2003
raise RevisionNotPresent(missing.pop(), self)
2004
return list(self._keys_to_version_ids(present_keys))
2006
def get_ancestry(self, versions, topo_sorted=True):
2007
"""See VersionedFile.get_ancestry."""
2008
if not self._parents:
2009
return self._parentless_ancestry(versions)
2010
# XXX: This will do len(history) index calls - perhaps
2011
# it should be altered to be a index core feature?
2012
# get a graph of all the mentioned versions:
2015
versions = self._version_ids_to_keys(versions)
2016
pending = set(versions)
2018
# get all pending nodes
2019
this_iteration = pending
2020
new_nodes = self._get_entries(this_iteration)
2023
for (index, key, value, node_refs) in new_nodes:
2024
# dont ask for ghosties - otherwise
2025
# we we can end up looping with pending
2026
# being entirely ghosted.
2027
graph[key] = [parent for parent in node_refs[0]
2028
if parent not in ghosts]
2030
for parent in graph[key]:
2031
# dont examine known nodes again
2036
ghosts.update(this_iteration.difference(found))
2037
if versions.difference(graph):
2038
raise RevisionNotPresent(versions.difference(graph).pop(), self)
2040
result_keys = topo_sort(graph.items())
2042
result_keys = graph.iterkeys()
2043
return [key[0] for key in result_keys]
2045
def get_ancestry_with_ghosts(self, versions):
2046
"""See VersionedFile.get_ancestry."""
2047
if not self._parents:
2048
return self._parentless_ancestry(versions)
2049
# XXX: This will do len(history) index calls - perhaps
2050
# it should be altered to be a index core feature?
2051
# get a graph of all the mentioned versions:
2053
versions = self._version_ids_to_keys(versions)
2054
pending = set(versions)
2056
# get all pending nodes
2057
this_iteration = pending
2058
new_nodes = self._get_entries(this_iteration)
2060
for (index, key, value, node_refs) in new_nodes:
2061
graph[key] = node_refs[0]
2063
for parent in graph[key]:
2064
# dont examine known nodes again
2068
missing_versions = this_iteration.difference(graph)
2069
missing_needed = versions.intersection(missing_versions)
2071
raise RevisionNotPresent(missing_needed.pop(), self)
2072
for missing_version in missing_versions:
2073
# add a key, no parents
2074
graph[missing_version] = []
2075
pending.discard(missing_version) # don't look for it
2076
result_keys = topo_sort(graph.items())
2077
return [key[0] for key in result_keys]
2079
def get_build_details(self, version_ids):
2080
"""Get the method, index_memo and compression parent for version_ids.
2082
Ghosts are omitted from the result.
2084
:param version_ids: An iterable of version_ids.
2085
:return: A dict of version_id:(index_memo, compression_parent,
2086
parents, record_details).
2088
opaque structure to pass to read_records to extract the raw
2091
Content that this record is built upon, may be None
2093
Logical parents of this node
2095
extra information about the content which needs to be passed to
2096
Factory.parse_record
2099
entries = self._get_entries(self._version_ids_to_keys(version_ids), True)
2100
for entry in entries:
2101
version_id = self._keys_to_version_ids((entry[1],))[0]
2102
if not self._parents:
2105
parents = self._keys_to_version_ids(entry[3][0])
2106
if not self._deltas:
2107
compression_parent = None
2109
compression_parent_key = self._compression_parent(entry)
2110
if compression_parent_key:
2111
compression_parent = self._keys_to_version_ids(
2112
(compression_parent_key,))[0]
2114
compression_parent = None
2115
noeol = (entry[2][0] == 'N')
2116
if compression_parent:
2117
method = 'line-delta'
2120
result[version_id] = (self._node_to_position(entry),
2121
compression_parent, parents,
2125
def _compression_parent(self, an_entry):
2126
# return the key that an_entry is compressed against, or None
2127
# Grab the second parent list (as deltas implies parents currently)
2128
compression_parents = an_entry[3][1]
2129
if not compression_parents:
2131
return compression_parents[0]
2133
def _get_method(self, node):
2134
if not self._deltas:
2136
if self._compression_parent(node):
2141
def num_versions(self):
2142
return len(list(self._graph_index.iter_all_entries()))
2144
__len__ = num_versions
2146
def get_versions(self):
2147
"""Get all the versions in the file. not topologically sorted."""
2148
return [node[1][0] for node in self._graph_index.iter_all_entries()]
2150
def has_version(self, version_id):
2151
"""True if the version is in the index."""
2152
return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
2154
def _keys_to_version_ids(self, keys):
2155
return tuple(key[0] for key in keys)
2157
def get_position(self, version_id):
2158
"""Return details needed to access the version.
2160
:return: a tuple (index, data position, size) to hand to the access
2161
logic to get the record.
2163
node = self._get_node(version_id)
2164
return self._node_to_position(node)
2166
def _node_to_position(self, node):
2167
"""Convert an index value to position details."""
2168
bits = node[2][1:].split(' ')
2169
return node[0], int(bits[0]), int(bits[1])
2171
def get_method(self, version_id):
2172
"""Return compression method of specified version."""
2173
return self._get_method(self._get_node(version_id))
2175
def _get_node(self, version_id):
2177
return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
2179
raise RevisionNotPresent(version_id, self)
2181
def get_options(self, version_id):
2182
"""Return a list representing options.
2186
node = self._get_node(version_id)
2187
options = [self._get_method(node)]
2188
if node[2][0] == 'N':
2189
options.append('no-eol')
2192
def get_parent_map(self, version_ids):
2193
"""Passed through to by KnitVersionedFile.get_parent_map."""
2194
nodes = self._get_entries(self._version_ids_to_keys(version_ids))
2198
result[node[1][0]] = self._keys_to_version_ids(node[3][0])
2201
result[node[1][0]] = ()
2204
def get_parents_with_ghosts(self, version_id):
2205
"""Return parents of specified version with ghosts."""
2207
return self.get_parent_map([version_id])[version_id]
2209
raise RevisionNotPresent(version_id, self)
2211
def check_versions_present(self, version_ids):
2212
"""Check that all specified versions are present."""
2213
keys = self._version_ids_to_keys(version_ids)
2214
present = self._present_keys(keys)
2215
missing = keys.difference(present)
2217
raise RevisionNotPresent(missing.pop(), self)
2219
def add_version(self, version_id, options, access_memo, parents):
2220
"""Add a version record to the index."""
2221
return self.add_versions(((version_id, options, access_memo, parents),))
2223
def add_versions(self, versions, random_id=False):
2224
"""Add multiple versions to the index.
2226
This function does not insert data into the Immutable GraphIndex
2227
backing the KnitGraphIndex, instead it prepares data for insertion by
2228
the caller and checks that it is safe to insert then calls
2229
self._add_callback with the prepared GraphIndex nodes.
2231
:param versions: a list of tuples:
2232
(version_id, options, pos, size, parents).
2233
:param random_id: If True the ids being added were randomly generated
2234
and no check for existence will be performed.
2236
if not self._add_callback:
2237
raise errors.ReadOnlyError(self)
2238
# we hope there are no repositories with inconsistent parentage
2243
for (version_id, options, access_memo, parents) in versions:
2244
index, pos, size = access_memo
2245
key = (version_id, )
2246
parents = tuple((parent, ) for parent in parents)
2247
if 'no-eol' in options:
2251
value += "%d %d" % (pos, size)
2252
if not self._deltas:
2253
if 'line-delta' in options:
2254
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2257
if 'line-delta' in options:
2258
node_refs = (parents, (parents[0],))
2260
node_refs = (parents, ())
2262
node_refs = (parents, )
2265
raise KnitCorrupt(self, "attempt to add node with parents "
2266
"in parentless index.")
2268
keys[key] = (value, node_refs)
2270
present_nodes = self._get_entries(keys)
2271
for (index, key, value, node_refs) in present_nodes:
2272
if (value, node_refs) != keys[key]:
2273
raise KnitCorrupt(self, "inconsistent details in add_versions"
2274
": %s %s" % ((value, node_refs), keys[key]))
2278
for key, (value, node_refs) in keys.iteritems():
2279
result.append((key, value, node_refs))
2281
for key, (value, node_refs) in keys.iteritems():
2282
result.append((key, value))
2283
self._add_callback(result)
2285
def _version_ids_to_keys(self, version_ids):
2286
return set((version_id, ) for version_id in version_ids)
2289
class _KnitAccess(object):
2290
"""Access to knit records in a .knit file."""
2292
def __init__(self, transport, filename, _file_mode, _dir_mode,
2293
_need_to_create, _create_parent_dir):
2294
"""Create a _KnitAccess for accessing and inserting data.
2296
:param transport: The transport the .knit is located on.
2297
:param filename: The filename of the .knit.
2299
self._transport = transport
2300
self._filename = filename
2301
self._file_mode = _file_mode
2302
self._dir_mode = _dir_mode
2303
self._need_to_create = _need_to_create
2304
self._create_parent_dir = _create_parent_dir
2306
def add_raw_records(self, sizes, raw_data):
2307
"""Add raw knit bytes to a storage area.
2309
The data is spooled to whereever the access method is storing data.
2311
:param sizes: An iterable containing the size of each raw data segment.
2312
:param raw_data: A bytestring containing the data.
2313
:return: A list of memos to retrieve the record later. Each memo is a
2314
tuple - (index, pos, length), where the index field is always None
2315
for the .knit access method.
2317
if not self._need_to_create:
2318
base = self._transport.append_bytes(self._filename, raw_data)
2320
self._transport.put_bytes_non_atomic(self._filename, raw_data,
2321
create_parent_dir=self._create_parent_dir,
2322
mode=self._file_mode,
2323
dir_mode=self._dir_mode)
2324
self._need_to_create = False
2328
result.append((None, base, size))
2333
"""IFF this data access has its own storage area, initialise it.
2337
self._transport.put_bytes_non_atomic(self._filename, '',
2338
mode=self._file_mode)
2340
def open_file(self):
2341
"""IFF this data access can be represented as a single file, open it.
2343
For knits that are not mapped to a single file on disk this will
2346
:return: None or a file handle.
2349
return self._transport.get(self._filename)
2354
def get_raw_records(self, memos_for_retrieval):
2355
"""Get the raw bytes for a records.
2357
:param memos_for_retrieval: An iterable containing the (index, pos,
2358
length) memo for retrieving the bytes. The .knit method ignores
2359
the index as there is always only a single file.
2360
:return: An iterator over the bytes of the records.
2362
read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
2363
for pos, data in self._transport.readv(self._filename, read_vector):
2367
class _PackAccess(object):
2368
"""Access to knit records via a collection of packs."""
2370
def __init__(self, index_to_packs, writer=None):
2371
"""Create a _PackAccess object.
2373
:param index_to_packs: A dict mapping index objects to the transport
2374
and file names for obtaining data.
2375
:param writer: A tuple (pack.ContainerWriter, write_index) which
2376
contains the pack to write, and the index that reads from it will
2380
self.container_writer = writer[0]
2381
self.write_index = writer[1]
2383
self.container_writer = None
2384
self.write_index = None
2385
self.indices = index_to_packs
2387
def add_raw_records(self, sizes, raw_data):
2388
"""Add raw knit bytes to a storage area.
2390
The data is spooled to the container writer in one bytes-record per
2393
:param sizes: An iterable containing the size of each raw data segment.
2394
:param raw_data: A bytestring containing the data.
2395
:return: A list of memos to retrieve the record later. Each memo is a
2396
tuple - (index, pos, length), where the index field is the
2397
write_index object supplied to the PackAccess object.
2402
p_offset, p_length = self.container_writer.add_bytes_record(
2403
raw_data[offset:offset+size], [])
2405
result.append((self.write_index, p_offset, p_length))
2409
"""Pack based knits do not get individually created."""
2411
def get_raw_records(self, memos_for_retrieval):
2412
"""Get the raw bytes for a records.
2414
:param memos_for_retrieval: An iterable containing the (index, pos,
2415
length) memo for retrieving the bytes. The Pack access method
2416
looks up the pack to use for a given record in its index_to_pack
2418
:return: An iterator over the bytes of the records.
2420
# first pass, group into same-index requests
2422
current_index = None
2423
for (index, offset, length) in memos_for_retrieval:
2424
if current_index == index:
2425
current_list.append((offset, length))
2427
if current_index is not None:
2428
request_lists.append((current_index, current_list))
2429
current_index = index
2430
current_list = [(offset, length)]
2431
# handle the last entry
2432
if current_index is not None:
2433
request_lists.append((current_index, current_list))
2434
for index, offsets in request_lists:
2435
transport, path = self.indices[index]
2436
reader = pack.make_readv_reader(transport, path, offsets)
2437
for names, read_func in reader.iter_records():
2438
yield read_func(None)
2440
def open_file(self):
2441
"""Pack based knits have no single file."""
2444
def set_writer(self, writer, index, (transport, packname)):
2445
"""Set a writer to use for adding data."""
2446
if index is not None:
2447
self.indices[index] = (transport, packname)
2448
self.container_writer = writer
2449
self.write_index = index
2452
class _StreamAccess(object):
2453
"""A Knit Access object that provides data from a datastream.
2455
It also provides a fallback to present as unannotated data, annotated data
2456
from a *backing* access object.
2458
This is triggered by a index_memo which is pointing to a different index
2459
than this was constructed with, and is used to allow extracting full
2460
unannotated texts for insertion into annotated knits.
2463
def __init__(self, reader_callable, stream_index, backing_knit,
2465
"""Create a _StreamAccess object.
2467
:param reader_callable: The reader_callable from the datastream.
2468
This is called to buffer all the data immediately, for
2470
:param stream_index: The index the data stream this provides access to
2471
which will be present in native index_memo's.
2472
:param backing_knit: The knit object that will provide access to
2473
annotated texts which are not available in the stream, so as to
2474
create unannotated texts.
2475
:param orig_factory: The original content factory used to generate the
2476
stream. This is used for checking whether the thunk code for
2477
supporting _copy_texts will generate the correct form of data.
2479
self.data = reader_callable(None)
2480
self.stream_index = stream_index
2481
self.backing_knit = backing_knit
2482
self.orig_factory = orig_factory
2484
def get_raw_records(self, memos_for_retrieval):
2485
"""Get the raw bytes for a records.
2487
:param memos_for_retrieval: An iterable of memos from the
2488
_StreamIndex object identifying bytes to read; for these classes
2489
they are (from_backing_knit, index, start, end) and can point to
2490
either the backing knit or streamed data.
2491
:return: An iterator yielding a byte string for each record in
2492
memos_for_retrieval.
2494
# use a generator for memory friendliness
2495
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2496
if not from_backing_knit:
2497
if version_id is not self.stream_index:
2498
raise AssertionError()
2499
yield self.data[start:end]
2501
# we have been asked to thunk. This thunking only occurs when
2502
# we are obtaining plain texts from an annotated backing knit
2503
# so that _copy_texts will work.
2504
# We could improve performance here by scanning for where we need
2505
# to do this and using get_line_list, then interleaving the output
2506
# as desired. However, for now, this is sufficient.
2507
if self.orig_factory.__class__ != KnitPlainFactory:
2508
raise errors.KnitCorrupt(
2509
self, 'Bad thunk request %r cannot be backed by %r' %
2510
(version_id, self.orig_factory))
2511
lines = self.backing_knit.get_lines(version_id)
2512
line_bytes = ''.join(lines)
2513
digest = sha_string(line_bytes)
2514
# the packed form of the fulltext always has a trailing newline,
2515
# even if the actual text does not, unless the file is empty. the
2516
# record options including the noeol flag are passed through by
2517
# _StreamIndex, so this is safe.
2519
if lines[-1][-1] != '\n':
2520
lines[-1] = lines[-1] + '\n'
2522
# We want plain data, because we expect to thunk only to allow text
2524
size, bytes = self.backing_knit._data._record_to_data(version_id,
2525
digest, lines, line_bytes)
2529
class _StreamIndex(object):
2530
"""A Knit Index object that uses the data map from a datastream."""
2532
def __init__(self, data_list, backing_index):
2533
"""Create a _StreamIndex object.
2535
:param data_list: The data_list from the datastream.
2536
:param backing_index: The index which will supply values for nodes
2537
referenced outside of this stream.
2539
self.data_list = data_list
2540
self.backing_index = backing_index
2541
self._by_version = {}
2543
for key, options, length, parents in data_list:
2544
self._by_version[key] = options, (pos, pos + length), parents
2547
def get_ancestry(self, versions, topo_sorted):
2548
"""Get an ancestry list for versions."""
2550
# Not needed for basic joins
2551
raise NotImplementedError(self.get_ancestry)
2552
# get a graph of all the mentioned versions:
2553
# Little ugly - basically copied from KnitIndex, but don't want to
2554
# accidentally incorporate too much of that index's code.
2556
pending = set(versions)
2557
cache = self._by_version
2559
version = pending.pop()
2562
parents = [p for p in cache[version][2] if p in cache]
2564
raise RevisionNotPresent(version, self)
2565
# if not completed and not a ghost
2566
pending.update([p for p in parents if p not in ancestry])
2567
ancestry.add(version)
2568
return list(ancestry)
2570
def get_build_details(self, version_ids):
2571
"""Get the method, index_memo and compression parent for version_ids.
2573
Ghosts are omitted from the result.
2575
:param version_ids: An iterable of version_ids.
2576
:return: A dict of version_id:(index_memo, compression_parent,
2577
parents, record_details).
2579
opaque memo that can be passed to _StreamAccess.read_records
2580
to extract the raw data; for these classes it is
2581
(from_backing_knit, index, start, end)
2583
Content that this record is built upon, may be None
2585
Logical parents of this node
2587
extra information about the content which needs to be passed to
2588
Factory.parse_record
2591
for version_id in version_ids:
2593
method = self.get_method(version_id)
2594
except errors.RevisionNotPresent:
2595
# ghosts are omitted
2597
parent_ids = self.get_parents_with_ghosts(version_id)
2598
noeol = ('no-eol' in self.get_options(version_id))
2599
index_memo = self.get_position(version_id)
2600
from_backing_knit = index_memo[0]
2601
if from_backing_knit:
2602
# texts retrieved from the backing knit are always full texts
2604
if method == 'fulltext':
2605
compression_parent = None
2607
compression_parent = parent_ids[0]
2608
result[version_id] = (index_memo, compression_parent,
2609
parent_ids, (method, noeol))
2612
def get_method(self, version_id):
2613
"""Return compression method of specified version."""
2614
options = self.get_options(version_id)
2615
if 'fulltext' in options:
2617
elif 'line-delta' in options:
2620
raise errors.KnitIndexUnknownMethod(self, options)
2622
def get_options(self, version_id):
2623
"""Return a list representing options.
2628
return self._by_version[version_id][0]
2630
options = list(self.backing_index.get_options(version_id))
2631
if 'fulltext' in options:
2633
elif 'line-delta' in options:
2634
# Texts from the backing knit are always returned from the stream
2636
options.remove('line-delta')
2637
options.append('fulltext')
2639
raise errors.KnitIndexUnknownMethod(self, options)
2640
return tuple(options)
2642
def get_parent_map(self, version_ids):
2643
"""Passed through to by KnitVersionedFile.get_parent_map."""
2646
for version_id in version_ids:
2648
result[version_id] = self._by_version[version_id][2]
2650
pending_ids.add(version_id)
2651
result.update(self.backing_index.get_parent_map(pending_ids))
2654
def get_parents_with_ghosts(self, version_id):
2655
"""Return parents of specified version with ghosts."""
2657
return self.get_parent_map([version_id])[version_id]
2659
raise RevisionNotPresent(version_id, self)
2661
def get_position(self, version_id):
2662
"""Return details needed to access the version.
2664
_StreamAccess has the data as a big array, so we return slice
2665
coordinates into that (as index_memo's are opaque outside the
2666
index and matching access class).
2668
:return: a tuple (from_backing_knit, index, start, end) that can
2669
be passed e.g. to get_raw_records.
2670
If from_backing_knit is False, index will be self, otherwise it
2671
will be a version id.
2674
start, end = self._by_version[version_id][1]
2675
return False, self, start, end
2677
# Signal to the access object to handle this from the backing knit.
2678
return (True, version_id, None, None)
2680
def get_versions(self):
2681
"""Get all the versions in the stream."""
2682
return self._by_version.keys()
2685
class _KnitData(object):
2686
"""Manage extraction of data from a KnitAccess, caching and decompressing.
2688
The KnitData class provides the logic for parsing and using knit records,
2689
making use of an access method for the low level read and write operations.
2692
def __init__(self, access):
2693
"""Create a KnitData object.
2695
:param access: The access method to use. Access methods such as
2696
_KnitAccess manage the insertion of raw records and the subsequent
2697
retrieval of the same.
2699
self._access = access
2700
self._checked = False
2702
def _open_file(self):
2703
return self._access.open_file()
2705
def _record_to_data(self, version_id, digest, lines, dense_lines=None):
2706
"""Convert version_id, digest, lines into a raw data block.
2708
:param dense_lines: The bytes of lines but in a denser form. For
2709
instance, if lines is a list of 1000 bytestrings each ending in \n,
2710
dense_lines may be a list with one line in it, containing all the
2711
1000's lines and their \n's. Using dense_lines if it is already
2712
known is a win because the string join to create bytes in this
2713
function spends less time resizing the final string.
2714
:return: (len, a StringIO instance with the raw data ready to read.)
2716
# Note: using a string copy here increases memory pressure with e.g.
2717
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
2718
# when doing the initial commit of a mozilla tree. RBC 20070921
2719
bytes = ''.join(chain(
2720
["version %s %d %s\n" % (version_id,
2723
dense_lines or lines,
2724
["end %s\n" % version_id]))
2725
compressed_bytes = bytes_to_gzip(bytes)
2726
return len(compressed_bytes), compressed_bytes
2728
def add_raw_records(self, sizes, raw_data):
2729
"""Append a prepared record to the data file.
2731
:param sizes: An iterable containing the size of each raw data segment.
2732
:param raw_data: A bytestring containing the data.
2733
:return: a list of index data for the way the data was stored.
2734
See the access method add_raw_records documentation for more
2737
return self._access.add_raw_records(sizes, raw_data)
2739
def _parse_record_header(self, version_id, raw_data):
2740
"""Parse a record header for consistency.
2742
:return: the header and the decompressor stream.
2743
as (stream, header_record)
2745
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
2747
rec = self._check_header(version_id, df.readline())
2748
except Exception, e:
2749
raise KnitCorrupt(self._access,
2750
"While reading {%s} got %s(%s)"
2751
% (version_id, e.__class__.__name__, str(e)))
2754
def _split_header(self, line):
2757
raise KnitCorrupt(self._access,
2758
'unexpected number of elements in record header')
2761
def _check_header_version(self, rec, version_id):
2762
if rec[1] != version_id:
2763
raise KnitCorrupt(self._access,
2764
'unexpected version, wanted %r, got %r'
2765
% (version_id, rec[1]))
2767
def _check_header(self, version_id, line):
2768
rec = self._split_header(line)
2769
self._check_header_version(rec, version_id)
2772
def _parse_record_unchecked(self, data):
2774
# 4168 calls in 2880 217 internal
2775
# 4168 calls to _parse_record_header in 2121
2776
# 4168 calls to readlines in 330
2777
df = GzipFile(mode='rb', fileobj=StringIO(data))
2779
record_contents = df.readlines()
2780
except Exception, e:
2781
raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
2782
(data, e.__class__.__name__, str(e)))
2783
header = record_contents.pop(0)
2784
rec = self._split_header(header)
2785
last_line = record_contents.pop()
2786
if len(record_contents) != int(rec[2]):
2787
raise KnitCorrupt(self._access,
2788
'incorrect number of lines %s != %s'
2790
% (len(record_contents), int(rec[2]),
2792
if last_line != 'end %s\n' % rec[1]:
2793
raise KnitCorrupt(self._access,
2794
'unexpected version end line %r, wanted %r'
2795
% (last_line, rec[1]))
2797
return rec, record_contents
2799
def _parse_record(self, version_id, data):
2800
rec, record_contents = self._parse_record_unchecked(data)
2801
self._check_header_version(rec, version_id)
2802
return record_contents, rec[3]
2804
def read_records_iter_raw(self, records):
2805
"""Read text records from data file and yield raw data.
2807
This unpacks enough of the text record to validate the id is
2808
as expected but thats all.
2810
Each item the iterator yields is (version_id, bytes,
2813
# setup an iterator of the external records:
2814
# uses readv so nice and fast we hope.
2816
# grab the disk data needed.
2817
needed_offsets = [index_memo for version_id, index_memo
2819
raw_records = self._access.get_raw_records(needed_offsets)
2821
for version_id, index_memo in records:
2822
data = raw_records.next()
2823
# validate the header
2824
df, rec = self._parse_record_header(version_id, data)
2826
yield version_id, data, rec[3]
2828
def read_records_iter(self, records):
2829
"""Read text records from data file and yield result.
2831
The result will be returned in whatever is the fastest to read.
2832
Not by the order requested. Also, multiple requests for the same
2833
record will only yield 1 response.
2834
:param records: A list of (version_id, pos, len) entries
2835
:return: Yields (version_id, contents, digest) in the order
2836
read, not the order requested
2841
needed_records = sorted(set(records), key=operator.itemgetter(1))
2842
if not needed_records:
2845
# The transport optimizes the fetching as well
2846
# (ie, reads continuous ranges.)
2847
raw_data = self._access.get_raw_records(
2848
[index_memo for version_id, index_memo in needed_records])
2850
for (version_id, index_memo), data in \
2851
izip(iter(needed_records), raw_data):
2852
content, digest = self._parse_record(version_id, data)
2853
yield version_id, content, digest
2855
def read_records(self, records):
2856
"""Read records into a dictionary."""
2858
for record_id, content, digest in \
2859
self.read_records_iter(records):
2860
components[record_id] = (content, digest)
2864
class InterKnit(InterVersionedFile):
2865
"""Optimised code paths for knit to knit operations."""
2867
_matching_file_from_factory = staticmethod(make_file_knit)
2868
_matching_file_to_factory = staticmethod(make_file_knit)
2871
def is_compatible(source, target):
2872
"""Be compatible with knits. """
2874
return (isinstance(source, KnitVersionedFile) and
2875
isinstance(target, KnitVersionedFile))
2876
except AttributeError:
2879
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2880
"""Copy texts to the target by extracting and adding them one by one.
2882
see join() for the parameter definitions.
2884
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2885
# --- the below is factorable out with VersionedFile.join, but wait for
2886
# VersionedFiles, it may all be simpler then.
2887
graph = Graph(self.source)
2888
search = graph._make_breadth_first_searcher(version_ids)
2889
transitive_ids = set()
2890
map(transitive_ids.update, list(search))
2891
parent_map = self.source.get_parent_map(transitive_ids)
2892
order = topo_sort(parent_map.items())
2894
def size_of_content(content):
2895
return sum(len(line) for line in content.text())
2896
# Cache at most 10MB of parent texts
2897
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2898
compute_size=size_of_content)
2899
# TODO: jam 20071116 It would be nice to have a streaming interface to
2900
# get multiple texts from a source. The source could be smarter
2901
# about how it handled intermediate stages.
2902
# get_line_list() or make_mpdiffs() seem like a possibility, but
2903
# at the moment they extract all full texts into memory, which
2904
# causes us to store more than our 3x fulltext goal.
2905
# Repository.iter_files_bytes() may be another possibility
2906
to_process = [version for version in order
2907
if version not in self.target]
2908
total = len(to_process)
2909
pb = ui.ui_factory.nested_progress_bar()
2911
for index, version in enumerate(to_process):
2912
pb.update('Converting versioned data', index, total)
2913
sha1, num_bytes, parent_text = self.target.add_lines(version,
2914
self.source.get_parents_with_ghosts(version),
2915
self.source.get_lines(version),
2916
parent_texts=parent_cache)
2917
parent_cache[version] = parent_text
2922
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2923
"""See InterVersionedFile.join."""
2924
# If the source and target are mismatched w.r.t. annotations vs
2925
# plain, the data needs to be converted accordingly
2926
if self.source.factory.annotated == self.target.factory.annotated:
2928
elif self.source.factory.annotated:
2929
converter = self._anno_to_plain_converter
2931
# We're converting from a plain to an annotated knit. Copy them
2932
# across by full texts.
2933
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2935
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2939
pb = ui.ui_factory.nested_progress_bar()
2941
version_ids = list(version_ids)
2942
if None in version_ids:
2943
version_ids.remove(None)
2945
self.source_ancestry = set(self.source.get_ancestry(version_ids,
2947
this_versions = set(self.target._index.get_versions())
2948
# XXX: For efficiency we should not look at the whole index,
2949
# we only need to consider the referenced revisions - they
2950
# must all be present, or the method must be full-text.
2951
# TODO, RBC 20070919
2952
needed_versions = self.source_ancestry - this_versions
2954
if not needed_versions:
2956
full_list = topo_sort(
2957
self.source.get_parent_map(self.source.versions()))
2959
version_list = [i for i in full_list if (not self.target.has_version(i)
2960
and i in needed_versions)]
2964
copy_queue_records = []
2966
for version_id in version_list:
2967
options = self.source._index.get_options(version_id)
2968
parents = self.source._index.get_parents_with_ghosts(version_id)
2969
# check that its will be a consistent copy:
2970
for parent in parents:
2971
# if source has the parent, we must :
2972
# * already have it or
2973
# * have it scheduled already
2974
# otherwise we don't care
2975
if not (self.target.has_version(parent) or
2976
parent in copy_set or
2977
not self.source.has_version(parent)):
2978
raise AssertionError("problem joining parent %r "
2980
% (parent, self.source, self.target))
2981
index_memo = self.source._index.get_position(version_id)
2982
copy_queue_records.append((version_id, index_memo))
2983
copy_queue.append((version_id, options, parents))
2984
copy_set.add(version_id)
2986
# data suck the join:
2988
total = len(version_list)
2991
for (version_id, raw_data, _), \
2992
(version_id2, options, parents) in \
2993
izip(self.source._data.read_records_iter_raw(copy_queue_records),
2995
if not (version_id == version_id2):
2996
raise AssertionError('logic error, inconsistent results')
2998
pb.update("Joining knit", count, total)
3000
size, raw_data = converter(raw_data, version_id, options,
3003
size = len(raw_data)
3004
raw_records.append((version_id, options, parents, size))
3005
raw_datum.append(raw_data)
3006
self.target._add_raw_records(raw_records, ''.join(raw_datum))
3011
def _anno_to_plain_converter(self, raw_data, version_id, options,
3013
"""Convert annotated content to plain content."""
3014
data, digest = self.source._data._parse_record(version_id, raw_data)
3015
if 'fulltext' in options:
3016
content = self.source.factory.parse_fulltext(data, version_id)
3017
lines = self.target.factory.lower_fulltext(content)
3019
delta = self.source.factory.parse_line_delta(data, version_id,
3021
lines = self.target.factory.lower_line_delta(delta)
3022
return self.target._data._record_to_data(version_id, digest, lines)
3025
InterVersionedFile.register_optimiser(InterKnit)
3028
class WeaveToKnit(InterVersionedFile):
3029
"""Optimised code paths for weave to knit operations."""
3031
_matching_file_from_factory = bzrlib.weave.WeaveFile
3032
_matching_file_to_factory = staticmethod(make_file_knit)
3035
def is_compatible(source, target):
3036
"""Be compatible with weaves to knits."""
3038
return (isinstance(source, bzrlib.weave.Weave) and
3039
isinstance(target, KnitVersionedFile))
3040
except AttributeError:
3043
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
3044
"""See InterVersionedFile.join."""
3045
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
3050
pb = ui.ui_factory.nested_progress_bar()
3052
version_ids = list(version_ids)
3054
self.source_ancestry = set(self.source.get_ancestry(version_ids))
3055
this_versions = set(self.target._index.get_versions())
3056
needed_versions = self.source_ancestry - this_versions
3058
if not needed_versions:
3060
full_list = topo_sort(
3061
self.source.get_parent_map(self.source.versions()))
3063
version_list = [i for i in full_list if (not self.target.has_version(i)
3064
and i in needed_versions)]
3068
total = len(version_list)
3069
parent_map = self.source.get_parent_map(version_list)
3070
for version_id in version_list:
3071
pb.update("Converting to knit", count, total)
3072
parents = parent_map[version_id]
3073
# check that its will be a consistent copy:
3074
for parent in parents:
3075
# if source has the parent, we must already have it
3076
if not self.target.has_version(parent):
3077
raise AssertionError("%r does not have parent %r"
3078
% (self.target, parent))
3079
self.target.add_lines(
3080
version_id, parents, self.source.get_lines(version_id))
3087
InterVersionedFile.register_optimiser(WeaveToKnit)
3090
# Deprecated, use PatienceSequenceMatcher instead
3091
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3094
def annotate_knit(knit, revision_id):
3095
"""Annotate a knit with no cached annotations.
3097
This implementation is for knits with no cached annotations.
3098
It will work for knits with cached annotations, but this is not
3101
annotator = _KnitAnnotator(knit)
3102
return iter(annotator.annotate(revision_id))
3105
class _KnitAnnotator(object):
3106
"""Build up the annotations for a text."""
3108
def __init__(self, knit):
3111
# Content objects, differs from fulltexts because of how final newlines
3112
# are treated by knits. the content objects here will always have a
3114
self._fulltext_contents = {}
3116
# Annotated lines of specific revisions
3117
self._annotated_lines = {}
3119
# Track the raw data for nodes that we could not process yet.
3120
# This maps the revision_id of the base to a list of children that will
3121
# annotated from it.
3122
self._pending_children = {}
3124
# Nodes which cannot be extracted
3125
self._ghosts = set()
3127
# Track how many children this node has, so we know if we need to keep
3129
self._annotate_children = {}
3130
self._compression_children = {}
3132
self._all_build_details = {}
3133
# The children => parent revision_id graph
3134
self._revision_id_graph = {}
3136
self._heads_provider = None
3138
self._nodes_to_keep_annotations = set()
3139
self._generations_until_keep = 100
3141
def set_generations_until_keep(self, value):
3142
"""Set the number of generations before caching a node.
3144
Setting this to -1 will cache every merge node, setting this higher
3145
will cache fewer nodes.
3147
self._generations_until_keep = value
3149
def _add_fulltext_content(self, revision_id, content_obj):
3150
self._fulltext_contents[revision_id] = content_obj
3151
# TODO: jam 20080305 It might be good to check the sha1digest here
3152
return content_obj.text()
3154
def _check_parents(self, child, nodes_to_annotate):
3155
"""Check if all parents have been processed.
3157
:param child: A tuple of (rev_id, parents, raw_content)
3158
:param nodes_to_annotate: If child is ready, add it to
3159
nodes_to_annotate, otherwise put it back in self._pending_children
3161
for parent_id in child[1]:
3162
if (parent_id not in self._annotated_lines):
3163
# This parent is present, but another parent is missing
3164
self._pending_children.setdefault(parent_id,
3168
# This one is ready to be processed
3169
nodes_to_annotate.append(child)
3171
def _add_annotation(self, revision_id, fulltext, parent_ids,
3172
left_matching_blocks=None):
3173
"""Add an annotation entry.
3175
All parents should already have been annotated.
3176
:return: A list of children that now have their parents satisfied.
3178
a = self._annotated_lines
3179
annotated_parent_lines = [a[p] for p in parent_ids]
3180
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
3181
fulltext, revision_id, left_matching_blocks,
3182
heads_provider=self._get_heads_provider()))
3183
self._annotated_lines[revision_id] = annotated_lines
3184
for p in parent_ids:
3185
ann_children = self._annotate_children[p]
3186
ann_children.remove(revision_id)
3187
if (not ann_children
3188
and p not in self._nodes_to_keep_annotations):
3189
del self._annotated_lines[p]
3190
del self._all_build_details[p]
3191
if p in self._fulltext_contents:
3192
del self._fulltext_contents[p]
3193
# Now that we've added this one, see if there are any pending
3194
# deltas to be done, certainly this parent is finished
3195
nodes_to_annotate = []
3196
for child in self._pending_children.pop(revision_id, []):
3197
self._check_parents(child, nodes_to_annotate)
3198
return nodes_to_annotate
3200
def _get_build_graph(self, revision_id):
3201
"""Get the graphs for building texts and annotations.
3203
The data you need for creating a full text may be different than the
3204
data you need to annotate that text. (At a minimum, you need both
3205
parents to create an annotation, but only need 1 parent to generate the
3208
:return: A list of (revision_id, index_memo) records, suitable for
3209
passing to read_records_iter to start reading in the raw data fro/
3212
if revision_id in self._annotated_lines:
3215
pending = set([revision_id])
3220
# get all pending nodes
3222
this_iteration = pending
3223
build_details = self._knit._index.get_build_details(this_iteration)
3224
self._all_build_details.update(build_details)
3225
# new_nodes = self._knit._index._get_entries(this_iteration)
3227
for rev_id, details in build_details.iteritems():
3228
(index_memo, compression_parent, parents,
3229
record_details) = details
3230
self._revision_id_graph[rev_id] = parents
3231
records.append((rev_id, index_memo))
3232
# Do we actually need to check _annotated_lines?
3233
pending.update(p for p in parents
3234
if p not in self._all_build_details)
3235
if compression_parent:
3236
self._compression_children.setdefault(compression_parent,
3239
for parent in parents:
3240
self._annotate_children.setdefault(parent,
3242
num_gens = generation - kept_generation
3243
if ((num_gens >= self._generations_until_keep)
3244
and len(parents) > 1):
3245
kept_generation = generation
3246
self._nodes_to_keep_annotations.add(rev_id)
3248
missing_versions = this_iteration.difference(build_details.keys())
3249
self._ghosts.update(missing_versions)
3250
for missing_version in missing_versions:
3251
# add a key, no parents
3252
self._revision_id_graph[missing_version] = ()
3253
pending.discard(missing_version) # don't look for it
3254
if self._ghosts.intersection(self._compression_children):
3256
"We cannot have nodes which have a ghost compression parent:\n"
3258
"compression children: %r"
3259
% (self._ghosts, self._compression_children))
3260
# Cleanout anything that depends on a ghost so that we don't wait for
3261
# the ghost to show up
3262
for node in self._ghosts:
3263
if node in self._annotate_children:
3264
# We won't be building this node
3265
del self._annotate_children[node]
3266
# Generally we will want to read the records in reverse order, because
3267
# we find the parent nodes after the children
3271
def _annotate_records(self, records):
3272
"""Build the annotations for the listed records."""
3273
# We iterate in the order read, rather than a strict order requested
3274
# However, process what we can, and put off to the side things that
3275
# still need parents, cleaning them up when those parents are
3277
for (rev_id, record,
3278
digest) in self._knit._data.read_records_iter(records):
3279
if rev_id in self._annotated_lines:
3281
parent_ids = self._revision_id_graph[rev_id]
3282
parent_ids = [p for p in parent_ids if p not in self._ghosts]
3283
details = self._all_build_details[rev_id]
3284
(index_memo, compression_parent, parents,
3285
record_details) = details
3286
nodes_to_annotate = []
3287
# TODO: Remove the punning between compression parents, and
3288
# parent_ids, we should be able to do this without assuming
3290
if len(parent_ids) == 0:
3291
# There are no parents for this node, so just add it
3292
# TODO: This probably needs to be decoupled
3293
fulltext_content, delta = self._knit.factory.parse_record(
3294
rev_id, record, record_details, None)
3295
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3296
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3297
parent_ids, left_matching_blocks=None))
3299
child = (rev_id, parent_ids, record)
3300
# Check if all the parents are present
3301
self._check_parents(child, nodes_to_annotate)
3302
while nodes_to_annotate:
3303
# Should we use a queue here instead of a stack?
3304
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
3305
(index_memo, compression_parent, parents,
3306
record_details) = self._all_build_details[rev_id]
3307
if compression_parent is not None:
3308
comp_children = self._compression_children[compression_parent]
3309
if rev_id not in comp_children:
3310
raise AssertionError("%r not in compression children %r"
3311
% (rev_id, comp_children))
3312
# If there is only 1 child, it is safe to reuse this
3314
reuse_content = (len(comp_children) == 1
3315
and compression_parent not in
3316
self._nodes_to_keep_annotations)
3318
# Remove it from the cache since it will be changing
3319
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3320
# Make sure to copy the fulltext since it might be
3322
parent_fulltext = list(parent_fulltext_content.text())
3324
parent_fulltext_content = self._fulltext_contents[compression_parent]
3325
parent_fulltext = parent_fulltext_content.text()
3326
comp_children.remove(rev_id)
3327
fulltext_content, delta = self._knit.factory.parse_record(
3328
rev_id, record, record_details,
3329
parent_fulltext_content,
3330
copy_base_content=(not reuse_content))
3331
fulltext = self._add_fulltext_content(rev_id,
3333
blocks = KnitContent.get_line_delta_blocks(delta,
3334
parent_fulltext, fulltext)
3336
fulltext_content = self._knit.factory.parse_fulltext(
3338
fulltext = self._add_fulltext_content(rev_id,
3341
nodes_to_annotate.extend(
3342
self._add_annotation(rev_id, fulltext, parent_ids,
3343
left_matching_blocks=blocks))
3345
def _get_heads_provider(self):
3346
"""Create a heads provider for resolving ancestry issues."""
3347
if self._heads_provider is not None:
3348
return self._heads_provider
3349
parent_provider = _mod_graph.DictParentsProvider(
3350
self._revision_id_graph)
3351
graph_obj = _mod_graph.Graph(parent_provider)
3352
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
3353
self._heads_provider = head_cache
3356
def annotate(self, revision_id):
3357
"""Return the annotated fulltext at the given revision.
3359
:param revision_id: The revision id for this file
3361
records = self._get_build_graph(revision_id)
3362
if revision_id in self._ghosts:
3363
raise errors.RevisionNotPresent(revision_id, self._knit)
3364
self._annotate_records(records)
3365
return self._annotated_lines[revision_id]
3369
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3371
from bzrlib._knit_load_data_py import _load_data_py as _load_data