1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
64
from cStringIO import StringIO
65
from itertools import izip, chain
70
from zlib import Z_DEFAULT_COMPRESSION
73
from bzrlib.lazy_import import lazy_import
74
lazy_import(globals(), """
95
from bzrlib.errors import (
103
RevisionAlreadyPresent,
105
from bzrlib.graph import Graph
106
from bzrlib.osutils import (
112
from bzrlib.symbol_versioning import (
113
DEPRECATED_PARAMETER,
118
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
120
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
124
# TODO: Split out code specific to this format into an associated object.
126
# TODO: Can we put in some kind of value to check that the index and data
127
# files belong together?
129
# TODO: accommodate binaries, perhaps by storing a byte count
131
# TODO: function to check whole file
133
# TODO: atomically append data, then measure backwards from the cursor
134
# position after writing to work out where it was located. we may need to
135
# bypass python file buffering.
137
DATA_SUFFIX = '.knit'
138
INDEX_SUFFIX = '.kndx'
141
class KnitContent(object):
142
"""Content of a knit version to which deltas can be applied."""
145
self._should_strip_eol = False
148
"""Return a list of (origin, text) tuples."""
149
return list(self.annotate_iter())
151
def apply_delta(self, delta, new_version_id):
152
"""Apply delta to this object to become new_version_id."""
153
raise NotImplementedError(self.apply_delta)
155
def cleanup_eol(self, copy_on_mutate=True):
156
if self._should_strip_eol:
158
self._lines = self._lines[:]
159
self.strip_last_line_newline()
161
def line_delta_iter(self, new_lines):
162
"""Generate line-based delta from this content to new_lines."""
163
new_texts = new_lines.text()
164
old_texts = self.text()
165
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
166
for tag, i1, i2, j1, j2 in s.get_opcodes():
169
# ofrom, oto, length, data
170
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
172
def line_delta(self, new_lines):
173
return list(self.line_delta_iter(new_lines))
176
def get_line_delta_blocks(knit_delta, source, target):
177
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
178
target_len = len(target)
181
for s_begin, s_end, t_len, new_text in knit_delta:
182
true_n = s_begin - s_pos
185
# knit deltas do not provide reliable info about whether the
186
# last line of a file matches, due to eol handling.
187
if source[s_pos + n -1] != target[t_pos + n -1]:
190
yield s_pos, t_pos, n
191
t_pos += t_len + true_n
193
n = target_len - t_pos
195
if source[s_pos + n -1] != target[t_pos + n -1]:
198
yield s_pos, t_pos, n
199
yield s_pos + (target_len - t_pos), target_len, 0
202
class AnnotatedKnitContent(KnitContent):
203
"""Annotated content."""
205
def __init__(self, lines):
206
KnitContent.__init__(self)
209
def annotate_iter(self):
210
"""Yield tuples of (origin, text) for each content line."""
211
return iter(self._lines)
213
def apply_delta(self, delta, new_version_id):
214
"""Apply delta to this object to become new_version_id."""
217
for start, end, count, delta_lines in delta:
218
lines[offset+start:offset+end] = delta_lines
219
offset = offset + (start - end) + count
221
def strip_last_line_newline(self):
222
line = self._lines[-1][1].rstrip('\n')
223
self._lines[-1] = (self._lines[-1][0], line)
224
self._should_strip_eol = False
228
lines = [text for origin, text in self._lines]
229
except ValueError, e:
230
# most commonly (only?) caused by the internal form of the knit
231
# missing annotation information because of a bug - see thread
233
raise KnitCorrupt(self,
234
"line in annotated knit missing annotation information: %s"
237
if self._should_strip_eol:
238
anno, line = lines[-1]
239
lines[-1] = (anno, line.rstrip('\n'))
243
return AnnotatedKnitContent(self._lines[:])
246
class PlainKnitContent(KnitContent):
247
"""Unannotated content.
249
When annotate[_iter] is called on this content, the same version is reported
250
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
254
def __init__(self, lines, version_id):
255
KnitContent.__init__(self)
257
self._version_id = version_id
259
def annotate_iter(self):
260
"""Yield tuples of (origin, text) for each content line."""
261
for line in self._lines:
262
yield self._version_id, line
264
def apply_delta(self, delta, new_version_id):
265
"""Apply delta to this object to become new_version_id."""
268
for start, end, count, delta_lines in delta:
269
lines[offset+start:offset+end] = delta_lines
270
offset = offset + (start - end) + count
271
self._version_id = new_version_id
274
return PlainKnitContent(self._lines[:], self._version_id)
276
def strip_last_line_newline(self):
277
self._lines[-1] = self._lines[-1].rstrip('\n')
278
self._should_strip_eol = False
282
if self._should_strip_eol:
284
lines[-1] = lines[-1].rstrip('\n')
288
class _KnitFactory(object):
289
"""Base class for common Factory functions."""
291
def parse_record(self, version_id, record, record_details,
292
base_content, copy_base_content=True):
293
"""Parse a record into a full content object.
295
:param version_id: The official version id for this content
296
:param record: The data returned by read_records_iter()
297
:param record_details: Details about the record returned by
299
:param base_content: If get_build_details returns a compression_parent,
300
you must return a base_content here, else use None
301
:param copy_base_content: When building from the base_content, decide
302
you can either copy it and return a new object, or modify it in
304
:return: (content, delta) A Content object and possibly a line-delta,
307
method, noeol = record_details
308
if method == 'line-delta':
309
assert base_content is not None
310
if copy_base_content:
311
content = base_content.copy()
313
content = base_content
314
delta = self.parse_line_delta(record, version_id)
315
content.apply_delta(delta, version_id)
317
content = self.parse_fulltext(record, version_id)
319
content._should_strip_eol = noeol
320
return (content, delta)
323
class KnitAnnotateFactory(_KnitFactory):
324
"""Factory for creating annotated Content objects."""
328
def make(self, lines, version_id):
329
num_lines = len(lines)
330
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
332
def parse_fulltext(self, content, version_id):
333
"""Convert fulltext to internal representation
335
fulltext content is of the format
336
revid(utf8) plaintext\n
337
internal representation is of the format:
340
# TODO: jam 20070209 The tests expect this to be returned as tuples,
341
# but the code itself doesn't really depend on that.
342
# Figure out a way to not require the overhead of turning the
343
# list back into tuples.
344
lines = [tuple(line.split(' ', 1)) for line in content]
345
return AnnotatedKnitContent(lines)
347
def parse_line_delta_iter(self, lines):
348
return iter(self.parse_line_delta(lines))
350
def parse_line_delta(self, lines, version_id, plain=False):
351
"""Convert a line based delta into internal representation.
353
line delta is in the form of:
354
intstart intend intcount
356
revid(utf8) newline\n
357
internal representation is
358
(start, end, count, [1..count tuples (revid, newline)])
360
:param plain: If True, the lines are returned as a plain
361
list without annotations, not as a list of (origin, content) tuples, i.e.
362
(start, end, count, [1..count newline])
369
def cache_and_return(line):
370
origin, text = line.split(' ', 1)
371
return cache.setdefault(origin, origin), text
373
# walk through the lines parsing.
374
# Note that the plain test is explicitly pulled out of the
375
# loop to minimise any performance impact
378
start, end, count = [int(n) for n in header.split(',')]
379
contents = [next().split(' ', 1)[1] for i in xrange(count)]
380
result.append((start, end, count, contents))
383
start, end, count = [int(n) for n in header.split(',')]
384
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
385
result.append((start, end, count, contents))
388
def get_fulltext_content(self, lines):
389
"""Extract just the content lines from a fulltext."""
390
return (line.split(' ', 1)[1] for line in lines)
392
def get_linedelta_content(self, lines):
393
"""Extract just the content from a line delta.
395
This doesn't return all of the extra information stored in a delta.
396
Only the actual content lines.
401
header = header.split(',')
402
count = int(header[2])
403
for i in xrange(count):
404
origin, text = next().split(' ', 1)
407
def lower_fulltext(self, content):
408
"""convert a fulltext content record into a serializable form.
410
see parse_fulltext which this inverts.
412
# TODO: jam 20070209 We only do the caching thing to make sure that
413
# the origin is a valid utf-8 line, eventually we could remove it
414
return ['%s %s' % (o, t) for o, t in content._lines]
416
def lower_line_delta(self, delta):
417
"""convert a delta into a serializable form.
419
See parse_line_delta which this inverts.
421
# TODO: jam 20070209 We only do the caching thing to make sure that
422
# the origin is a valid utf-8 line, eventually we could remove it
424
for start, end, c, lines in delta:
425
out.append('%d,%d,%d\n' % (start, end, c))
426
out.extend(origin + ' ' + text
427
for origin, text in lines)
430
def annotate_iter(self, knit, version_id):
431
content = knit._get_content(version_id)
432
return content.annotate_iter()
435
class KnitPlainFactory(_KnitFactory):
436
"""Factory for creating plain Content objects."""
440
def make(self, lines, version_id):
441
return PlainKnitContent(lines, version_id)
443
def parse_fulltext(self, content, version_id):
444
"""This parses an unannotated fulltext.
446
Note that this is not a noop - the internal representation
447
has (versionid, line) - its just a constant versionid.
449
return self.make(content, version_id)
451
def parse_line_delta_iter(self, lines, version_id):
453
num_lines = len(lines)
454
while cur < num_lines:
457
start, end, c = [int(n) for n in header.split(',')]
458
yield start, end, c, lines[cur:cur+c]
461
def parse_line_delta(self, lines, version_id):
462
return list(self.parse_line_delta_iter(lines, version_id))
464
def get_fulltext_content(self, lines):
465
"""Extract just the content lines from a fulltext."""
468
def get_linedelta_content(self, lines):
469
"""Extract just the content from a line delta.
471
This doesn't return all of the extra information stored in a delta.
472
Only the actual content lines.
477
header = header.split(',')
478
count = int(header[2])
479
for i in xrange(count):
482
def lower_fulltext(self, content):
483
return content.text()
485
def lower_line_delta(self, delta):
487
for start, end, c, lines in delta:
488
out.append('%d,%d,%d\n' % (start, end, c))
492
def annotate_iter(self, knit, version_id):
493
annotator = _KnitAnnotator(knit)
494
return iter(annotator.annotate(version_id))
497
def make_empty_knit(transport, relpath):
498
"""Construct a empty knit at the specified location."""
499
k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
502
class KnitVersionedFile(VersionedFile):
503
"""Weave-like structure with faster random access.
505
A knit stores a number of texts and a summary of the relationships
506
between them. Texts are identified by a string version-id. Texts
507
are normally stored and retrieved as a series of lines, but can
508
also be passed as single strings.
510
Lines are stored with the trailing newline (if any) included, to
511
avoid special cases for files with no final newline. Lines are
512
composed of 8-bit characters, not unicode. The combination of
513
these approaches should mean any 'binary' file can be safely
514
stored and retrieved.
517
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
518
factory=None, delta=True, create=False, create_parent_dir=False,
519
delay_create=False, dir_mode=None, index=None, access_method=None):
520
"""Construct a knit at location specified by relpath.
522
:param create: If not True, only open an existing knit.
523
:param create_parent_dir: If True, create the parent directory if
524
creating the file fails. (This is used for stores with
525
hash-prefixes that may not exist yet)
526
:param delay_create: The calling code is aware that the knit won't
527
actually be created until the first data is stored.
528
:param index: An index to use for the knit.
530
if access_mode is None:
532
super(KnitVersionedFile, self).__init__(access_mode)
533
assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
534
self.transport = transport
535
self.filename = relpath
536
self.factory = factory or KnitAnnotateFactory()
537
self.writable = (access_mode == 'w')
540
self._max_delta_chain = 200
543
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
544
access_mode, create=create, file_mode=file_mode,
545
create_parent_dir=create_parent_dir, delay_create=delay_create,
549
if access_method is None:
550
_access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
551
((create and not len(self)) and delay_create), create_parent_dir)
553
_access = access_method
554
if create and not len(self) and not delay_create:
556
self._data = _KnitData(_access)
559
return '%s(%s)' % (self.__class__.__name__,
560
self.transport.abspath(self.filename))
562
def _check_should_delta(self, first_parents):
563
"""Iterate back through the parent listing, looking for a fulltext.
565
This is used when we want to decide whether to add a delta or a new
566
fulltext. It searches for _max_delta_chain parents. When it finds a
567
fulltext parent, it sees if the total size of the deltas leading up to
568
it is large enough to indicate that we want a new full text anyway.
570
Return True if we should create a new delta, False if we should use a
575
delta_parents = first_parents
576
for count in xrange(self._max_delta_chain):
577
parent = delta_parents[0]
578
method = self._index.get_method(parent)
579
index, pos, size = self._index.get_position(parent)
580
if method == 'fulltext':
584
delta_parents = self._index.get_parent_map([parent])[parent]
586
# We couldn't find a fulltext, so we must create a new one
589
return fulltext_size > delta_size
591
def _add_raw_records(self, records, data):
592
"""Add all the records 'records' with data pre-joined in 'data'.
594
:param records: A list of tuples(version_id, options, parents, size).
595
:param data: The data for the records. When it is written, the records
596
are adjusted to have pos pointing into data by the sum of
597
the preceding records sizes.
600
raw_record_sizes = [record[3] for record in records]
601
positions = self._data.add_raw_records(raw_record_sizes, data)
604
for (version_id, options, parents, size), access_memo in zip(
606
index_entries.append((version_id, options, access_memo, parents))
607
if self._data._do_cache:
608
self._data._cache[version_id] = data[offset:offset+size]
610
self._index.add_versions(index_entries)
612
def enable_cache(self):
613
"""Start caching data for this knit"""
614
self._data.enable_cache()
616
def clear_cache(self):
617
"""Clear the data cache only."""
618
self._data.clear_cache()
620
def copy_to(self, name, transport):
621
"""See VersionedFile.copy_to()."""
622
# copy the current index to a temp index to avoid racing with local
624
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
625
self.transport.get(self._index._filename))
627
f = self._data._open_file()
629
transport.put_file(name + DATA_SUFFIX, f)
632
# move the copied index into place
633
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
635
def create_empty(self, name, transport, mode=None):
636
return KnitVersionedFile(name, transport, factory=self.factory,
637
delta=self.delta, create=True)
639
def get_data_stream(self, required_versions):
640
"""Get a data stream for the specified versions.
642
Versions may be returned in any order, not necessarily the order
643
specified. They are returned in a partial order by compression
644
parent, so that the deltas can be applied as the data stream is
645
inserted; however note that compression parents will not be sent
646
unless they were specifically requested, as the client may already
649
:param required_versions: The exact set of versions to be extracted.
650
Unlike some other knit methods, this is not used to generate a
651
transitive closure, rather it is used precisely as given.
653
:returns: format_signature, list of (version, options, length, parents),
656
required_version_set = frozenset(required_versions)
658
# list of revisions that can just be sent without waiting for their
661
# map from revision to the children based on it
663
# first, read all relevant index data, enough to sort into the right
665
for version_id in required_versions:
666
options = self._index.get_options(version_id)
667
parents = self._index.get_parents_with_ghosts(version_id)
668
index_memo = self._index.get_position(version_id)
669
version_index[version_id] = (index_memo, options, parents)
670
if ('line-delta' in options
671
and parents[0] in required_version_set):
672
# must wait until the parent has been sent
673
deferred.setdefault(parents[0], []). \
676
# either a fulltext, or a delta whose parent the client did
677
# not ask for and presumably already has
678
ready_to_send.append(version_id)
679
# build a list of results to return, plus instructions for data to
681
copy_queue_records = []
682
temp_version_list = []
684
# XXX: pushing and popping lists may be a bit inefficient
685
version_id = ready_to_send.pop(0)
686
(index_memo, options, parents) = version_index[version_id]
687
copy_queue_records.append((version_id, index_memo))
688
none, data_pos, data_size = index_memo
689
temp_version_list.append((version_id, options, data_size,
691
if version_id in deferred:
692
# now we can send all the children of this revision - we could
693
# put them in anywhere, but we hope that sending them soon
694
# after the fulltext will give good locality in the receiver
695
ready_to_send[:0] = deferred.pop(version_id)
696
assert len(deferred) == 0, \
697
"Still have compressed child versions waiting to be sent"
698
# XXX: The stream format is such that we cannot stream it - we have to
699
# know the length of all the data a-priori.
701
result_version_list = []
702
for (version_id, raw_data), \
703
(version_id2, options, _, parents) in \
704
izip(self._data.read_records_iter_raw(copy_queue_records),
706
assert version_id == version_id2, \
707
'logic error, inconsistent results'
708
raw_datum.append(raw_data)
709
result_version_list.append(
710
(version_id, options, len(raw_data), parents))
711
# provide a callback to get data incrementally.
712
pseudo_file = StringIO(''.join(raw_datum))
715
return pseudo_file.read()
717
return pseudo_file.read(length)
718
return (self.get_format_signature(), result_version_list, read)
720
def _extract_blocks(self, version_id, source, target):
721
if self._index.get_method(version_id) != 'line-delta':
723
parent, sha1, noeol, delta = self.get_delta(version_id)
724
return KnitContent.get_line_delta_blocks(delta, source, target)
726
def get_delta(self, version_id):
727
"""Get a delta for constructing version from some other version."""
728
self.check_not_reserved_id(version_id)
729
parents = self.get_parent_map([version_id])[version_id]
734
index_memo = self._index.get_position(version_id)
735
data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
736
noeol = 'no-eol' in self._index.get_options(version_id)
737
if 'fulltext' == self._index.get_method(version_id):
738
new_content = self.factory.parse_fulltext(data, version_id)
739
if parent is not None:
740
reference_content = self._get_content(parent)
741
old_texts = reference_content.text()
744
new_texts = new_content.text()
745
delta_seq = patiencediff.PatienceSequenceMatcher(None, old_texts,
747
return parent, sha1, noeol, self._make_line_delta(delta_seq, new_content)
749
delta = self.factory.parse_line_delta(data, version_id)
750
return parent, sha1, noeol, delta
752
def get_format_signature(self):
753
"""See VersionedFile.get_format_signature()."""
754
if self.factory.annotated:
755
annotated_part = "annotated"
757
annotated_part = "plain"
758
return "knit-%s" % (annotated_part,)
760
@deprecated_method(one_four)
761
def get_graph_with_ghosts(self):
762
"""See VersionedFile.get_graph_with_ghosts()."""
763
return self.get_parent_map(self.versions())
765
def get_sha1(self, version_id):
766
return self.get_sha1s([version_id])[0]
768
def get_sha1s(self, version_ids):
769
"""See VersionedFile.get_sha1()."""
770
record_map = self._get_record_map(version_ids)
771
# record entry 2 is the 'digest'.
772
return [record_map[v][2] for v in version_ids]
776
"""See VersionedFile.get_suffixes()."""
777
return [DATA_SUFFIX, INDEX_SUFFIX]
779
@deprecated_method(one_four)
780
def has_ghost(self, version_id):
781
"""True if there is a ghost reference in the file to version_id."""
783
if self.has_version(version_id):
785
# optimisable if needed by memoising the _ghosts set.
786
items = self.get_parent_map(self.versions())
787
for parents in items.itervalues():
788
for parent in parents:
789
if parent == version_id and parent not in items:
793
def insert_data_stream(self, (format, data_list, reader_callable)):
794
"""Insert knit records from a data stream into this knit.
796
If a version in the stream is already present in this knit, it will not
797
be inserted a second time. It will be checked for consistency with the
798
stored version however, and may cause a KnitCorrupt error to be raised
799
if the data in the stream disagrees with the already stored data.
801
:seealso: get_data_stream
803
if format != self.get_format_signature():
804
if 'knit' in debug.debug_flags:
806
'incompatible format signature inserting to %r', self)
807
source = self._knit_from_datastream(
808
(format, data_list, reader_callable))
812
for version_id, options, length, parents in data_list:
813
if self.has_version(version_id):
814
# First check: the list of parents.
815
my_parents = self.get_parents_with_ghosts(version_id)
816
if tuple(my_parents) != tuple(parents):
817
# XXX: KnitCorrupt is not quite the right exception here.
820
'parents list %r from data stream does not match '
821
'already recorded parents %r for %s'
822
% (parents, my_parents, version_id))
824
# Also check the SHA-1 of the fulltext this content will
826
raw_data = reader_callable(length)
827
my_fulltext_sha1 = self.get_sha1(version_id)
828
df, rec = self._data._parse_record_header(version_id, raw_data)
829
stream_fulltext_sha1 = rec[3]
830
if my_fulltext_sha1 != stream_fulltext_sha1:
831
# Actually, we don't know if it's this knit that's corrupt,
832
# or the data stream we're trying to insert.
834
self.filename, 'sha-1 does not match %s' % version_id)
836
if 'line-delta' in options:
837
# Make sure that this knit record is actually useful: a
838
# line-delta is no use unless we have its parent.
839
# Fetching from a broken repository with this problem
840
# shouldn't break the target repository.
842
# See https://bugs.launchpad.net/bzr/+bug/164443
843
if not self._index.has_version(parents[0]):
846
'line-delta from stream '
849
'missing parent %s\n'
850
'Try running "bzr check" '
851
'on the source repository, and "bzr reconcile" '
853
(version_id, parents[0]))
854
self._add_raw_records(
855
[(version_id, options, parents, length)],
856
reader_callable(length))
858
def _knit_from_datastream(self, (format, data_list, reader_callable)):
859
"""Create a knit object from a data stream.
861
This method exists to allow conversion of data streams that do not
862
match the signature of this knit. Generally it will be slower and use
863
more memory to use this method to insert data, but it will work.
865
:seealso: get_data_stream for details on datastreams.
866
:return: A knit versioned file which can be used to join the datastream
869
if format == "knit-plain":
870
factory = KnitPlainFactory()
871
elif format == "knit-annotated":
872
factory = KnitAnnotateFactory()
874
raise errors.KnitDataStreamUnknown(format)
875
index = _StreamIndex(data_list, self._index)
876
access = _StreamAccess(reader_callable, index, self, factory)
877
return KnitVersionedFile(self.filename, self.transport,
878
factory=factory, index=index, access_method=access)
881
"""See VersionedFile.versions."""
882
if 'evil' in debug.debug_flags:
883
trace.mutter_callsite(2, "versions scales with size of history")
884
return self._index.get_versions()
886
def has_version(self, version_id):
887
"""See VersionedFile.has_version."""
888
if 'evil' in debug.debug_flags:
889
trace.mutter_callsite(2, "has_version is a LBYL scenario")
890
return self._index.has_version(version_id)
892
__contains__ = has_version
894
def _merge_annotations(self, content, parents, parent_texts={},
895
delta=None, annotated=None,
896
left_matching_blocks=None):
897
"""Merge annotations for content. This is done by comparing
898
the annotations based on changed to the text.
900
if left_matching_blocks is not None:
901
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
905
for parent_id in parents:
906
merge_content = self._get_content(parent_id, parent_texts)
907
if (parent_id == parents[0] and delta_seq is not None):
910
seq = patiencediff.PatienceSequenceMatcher(
911
None, merge_content.text(), content.text())
912
for i, j, n in seq.get_matching_blocks():
915
# this appears to copy (origin, text) pairs across to the
916
# new content for any line that matches the last-checked
918
content._lines[j:j+n] = merge_content._lines[i:i+n]
920
if delta_seq is None:
921
reference_content = self._get_content(parents[0], parent_texts)
922
new_texts = content.text()
923
old_texts = reference_content.text()
924
delta_seq = patiencediff.PatienceSequenceMatcher(
925
None, old_texts, new_texts)
926
return self._make_line_delta(delta_seq, content)
928
def _make_line_delta(self, delta_seq, new_content):
929
"""Generate a line delta from delta_seq and new_content."""
931
for op in delta_seq.get_opcodes():
934
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
937
def _get_components_positions(self, version_ids):
938
"""Produce a map of position data for the components of versions.
940
This data is intended to be used for retrieving the knit records.
942
A dict of version_id to (record_details, index_memo, next, parents) is
944
method is the way referenced data should be applied.
945
index_memo is the handle to pass to the data access to actually get the
947
next is the build-parent of the version, or None for fulltexts.
948
parents is the version_ids of the parents of this version
951
pending_components = version_ids
952
while pending_components:
953
build_details = self._index.get_build_details(pending_components)
954
current_components = set(pending_components)
955
pending_components = set()
956
for version_id, details in build_details.iteritems():
957
(index_memo, compression_parent, parents,
958
record_details) = details
959
method = record_details[0]
960
if compression_parent is not None:
961
pending_components.add(compression_parent)
962
component_data[version_id] = (record_details, index_memo,
964
missing = current_components.difference(build_details)
966
raise errors.RevisionNotPresent(missing.pop(), self.filename)
967
return component_data
969
def _get_content(self, version_id, parent_texts={}):
970
"""Returns a content object that makes up the specified
972
cached_version = parent_texts.get(version_id, None)
973
if cached_version is not None:
974
if not self.has_version(version_id):
975
raise RevisionNotPresent(version_id, self.filename)
976
return cached_version
978
text_map, contents_map = self._get_content_maps([version_id])
979
return contents_map[version_id]
981
def _check_versions_present(self, version_ids):
982
"""Check that all specified versions are present."""
983
self._index.check_versions_present(version_ids)
985
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts,
986
nostore_sha, random_id, check_content, left_matching_blocks):
987
"""See VersionedFile.add_lines_with_ghosts()."""
988
self._check_add(version_id, lines, random_id, check_content)
989
return self._add(version_id, lines, parents, self.delta,
990
parent_texts, left_matching_blocks, nostore_sha, random_id)
992
def _add_lines(self, version_id, parents, lines, parent_texts,
993
left_matching_blocks, nostore_sha, random_id, check_content):
994
"""See VersionedFile.add_lines."""
995
self._check_add(version_id, lines, random_id, check_content)
996
self._check_versions_present(parents)
997
return self._add(version_id, lines[:], parents, self.delta,
998
parent_texts, left_matching_blocks, nostore_sha, random_id)
1000
def _check_add(self, version_id, lines, random_id, check_content):
1001
"""check that version_id and lines are safe to add."""
1002
if contains_whitespace(version_id):
1003
raise InvalidRevisionId(version_id, self.filename)
1004
self.check_not_reserved_id(version_id)
1005
# Technically this could be avoided if we are happy to allow duplicate
1006
# id insertion when other things than bzr core insert texts, but it
1007
# seems useful for folk using the knit api directly to have some safety
1008
# blanket that we can disable.
1009
if not random_id and self.has_version(version_id):
1010
raise RevisionAlreadyPresent(version_id, self.filename)
1012
self._check_lines_not_unicode(lines)
1013
self._check_lines_are_lines(lines)
1015
def _add(self, version_id, lines, parents, delta, parent_texts,
1016
left_matching_blocks, nostore_sha, random_id):
1017
"""Add a set of lines on top of version specified by parents.
1019
If delta is true, compress the text as a line-delta against
1022
Any versions not present will be converted into ghosts.
1024
# first thing, if the content is something we don't need to store, find
1026
line_bytes = ''.join(lines)
1027
digest = sha_string(line_bytes)
1028
if nostore_sha == digest:
1029
raise errors.ExistingContent
1031
present_parents = []
1032
if parent_texts is None:
1034
for parent in parents:
1035
if self.has_version(parent):
1036
present_parents.append(parent)
1038
# can only compress against the left most present parent.
1040
(len(present_parents) == 0 or
1041
present_parents[0] != parents[0])):
1044
text_length = len(line_bytes)
1047
if lines[-1][-1] != '\n':
1048
# copy the contents of lines.
1050
options.append('no-eol')
1051
lines[-1] = lines[-1] + '\n'
1055
# To speed the extract of texts the delta chain is limited
1056
# to a fixed number of deltas. This should minimize both
1057
# I/O and the time spend applying deltas.
1058
delta = self._check_should_delta(present_parents)
1060
assert isinstance(version_id, str)
1061
content = self.factory.make(lines, version_id)
1062
if delta or (self.factory.annotated and len(present_parents) > 0):
1063
# Merge annotations from parent texts if needed.
1064
delta_hunks = self._merge_annotations(content, present_parents,
1065
parent_texts, delta, self.factory.annotated,
1066
left_matching_blocks)
1069
options.append('line-delta')
1070
store_lines = self.factory.lower_line_delta(delta_hunks)
1071
size, bytes = self._data._record_to_data(version_id, digest,
1074
options.append('fulltext')
1075
# isinstance is slower and we have no hierarchy.
1076
if self.factory.__class__ == KnitPlainFactory:
1077
# Use the already joined bytes saving iteration time in
1079
size, bytes = self._data._record_to_data(version_id, digest,
1080
lines, [line_bytes])
1082
# get mixed annotation + content and feed it into the
1084
store_lines = self.factory.lower_fulltext(content)
1085
size, bytes = self._data._record_to_data(version_id, digest,
1088
access_memo = self._data.add_raw_records([size], bytes)[0]
1089
self._index.add_versions(
1090
((version_id, options, access_memo, parents),),
1091
random_id=random_id)
1092
return digest, text_length, content
1094
def check(self, progress_bar=None):
1095
"""See VersionedFile.check()."""
1097
def _clone_text(self, new_version_id, old_version_id, parents):
1098
"""See VersionedFile.clone_text()."""
1099
# FIXME RBC 20060228 make fast by only inserting an index with null
1101
self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
1103
def get_lines(self, version_id):
1104
"""See VersionedFile.get_lines()."""
1105
return self.get_line_list([version_id])[0]
1107
def _get_record_map(self, version_ids):
1108
"""Produce a dictionary of knit records.
1110
:return: {version_id:(record, record_details, digest, next)}
1112
data returned from read_records
1114
opaque information to pass to parse_record
1116
SHA1 digest of the full text after all steps are done
1118
build-parent of the version, i.e. the leftmost ancestor.
1119
Will be None if the record is not a delta.
1121
position_map = self._get_components_positions(version_ids)
1122
# c = component_id, r = record_details, i_m = index_memo, n = next
1123
records = [(c, i_m) for c, (r, i_m, n)
1124
in position_map.iteritems()]
1126
for component_id, record, digest in \
1127
self._data.read_records_iter(records):
1128
(record_details, index_memo, next) = position_map[component_id]
1129
record_map[component_id] = record, record_details, digest, next
1133
def get_text(self, version_id):
1134
"""See VersionedFile.get_text"""
1135
return self.get_texts([version_id])[0]
1137
def get_texts(self, version_ids):
1138
return [''.join(l) for l in self.get_line_list(version_ids)]
1140
def get_line_list(self, version_ids):
1141
"""Return the texts of listed versions as a list of strings."""
1142
for version_id in version_ids:
1143
self.check_not_reserved_id(version_id)
1144
text_map, content_map = self._get_content_maps(version_ids)
1145
return [text_map[v] for v in version_ids]
1147
_get_lf_split_line_list = get_line_list
1149
def _get_content_maps(self, version_ids):
1150
"""Produce maps of text and KnitContents
1152
:return: (text_map, content_map) where text_map contains the texts for
1153
the requested versions and content_map contains the KnitContents.
1154
Both dicts take version_ids as their keys.
1156
# FUTURE: This function could be improved for the 'extract many' case
1157
# by tracking each component and only doing the copy when the number of
1158
# children than need to apply delta's to it is > 1 or it is part of the
1160
version_ids = list(version_ids)
1161
multiple_versions = len(version_ids) != 1
1162
record_map = self._get_record_map(version_ids)
1167
for version_id in version_ids:
1170
while cursor is not None:
1171
record, record_details, digest, next = record_map[cursor]
1172
components.append((cursor, record, record_details, digest))
1173
if cursor in content_map:
1178
for (component_id, record, record_details,
1179
digest) in reversed(components):
1180
if component_id in content_map:
1181
content = content_map[component_id]
1183
content, delta = self.factory.parse_record(version_id,
1184
record, record_details, content,
1185
copy_base_content=multiple_versions)
1186
if multiple_versions:
1187
content_map[component_id] = content
1189
content.cleanup_eol(copy_on_mutate=multiple_versions)
1190
final_content[version_id] = content
1192
# digest here is the digest from the last applied component.
1193
text = content.text()
1194
actual_sha = sha_strings(text)
1195
if actual_sha != digest:
1196
raise KnitCorrupt(self.filename,
1198
'\n of reconstructed text does not match'
1200
'\n for version %s' %
1201
(actual_sha, digest, version_id))
1202
text_map[version_id] = text
1203
return text_map, final_content
1205
def iter_lines_added_or_present_in_versions(self, version_ids=None,
1207
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
1208
if version_ids is None:
1209
version_ids = self.versions()
1211
pb = progress.DummyProgress()
1212
# we don't care about inclusions, the caller cares.
1213
# but we need to setup a list of records to visit.
1214
# we need version_id, position, length
1215
version_id_records = []
1216
requested_versions = set(version_ids)
1217
# filter for available versions
1218
for version_id in requested_versions:
1219
if not self.has_version(version_id):
1220
raise RevisionNotPresent(version_id, self.filename)
1221
# get a in-component-order queue:
1222
for version_id in self.versions():
1223
if version_id in requested_versions:
1224
index_memo = self._index.get_position(version_id)
1225
version_id_records.append((version_id, index_memo))
1227
total = len(version_id_records)
1228
for version_idx, (version_id, data, sha_value) in \
1229
enumerate(self._data.read_records_iter(version_id_records)):
1230
pb.update('Walking content.', version_idx, total)
1231
method = self._index.get_method(version_id)
1233
assert method in ('fulltext', 'line-delta')
1234
if method == 'fulltext':
1235
line_iterator = self.factory.get_fulltext_content(data)
1237
line_iterator = self.factory.get_linedelta_content(data)
1238
# XXX: It might be more efficient to yield (version_id,
1239
# line_iterator) in the future. However for now, this is a simpler
1240
# change to integrate into the rest of the codebase. RBC 20071110
1241
for line in line_iterator:
1242
yield line, version_id
1244
pb.update('Walking content.', total, total)
1246
def iter_parents(self, version_ids):
1247
"""Iterate through the parents for many version ids.
1249
:param version_ids: An iterable yielding version_ids.
1250
:return: An iterator that yields (version_id, parents). Requested
1251
version_ids not present in the versioned file are simply skipped.
1252
The order is undefined, allowing for different optimisations in
1253
the underlying implementation.
1255
return self._index.iter_parents(version_ids)
1257
def num_versions(self):
1258
"""See VersionedFile.num_versions()."""
1259
return self._index.num_versions()
1261
__len__ = num_versions
1263
def annotate_iter(self, version_id):
1264
"""See VersionedFile.annotate_iter."""
1265
return self.factory.annotate_iter(self, version_id)
1267
def get_parent_map(self, version_ids):
1268
"""See VersionedFile.get_parent_map."""
1269
return self._index.get_parent_map(version_ids)
1271
def get_ancestry(self, versions, topo_sorted=True):
1272
"""See VersionedFile.get_ancestry."""
1273
if isinstance(versions, basestring):
1274
versions = [versions]
1277
return self._index.get_ancestry(versions, topo_sorted)
1279
def get_ancestry_with_ghosts(self, versions):
1280
"""See VersionedFile.get_ancestry_with_ghosts."""
1281
if isinstance(versions, basestring):
1282
versions = [versions]
1285
return self._index.get_ancestry_with_ghosts(versions)
1287
def plan_merge(self, ver_a, ver_b):
1288
"""See VersionedFile.plan_merge."""
1289
ancestors_b = set(self.get_ancestry(ver_b, topo_sorted=False))
1290
ancestors_a = set(self.get_ancestry(ver_a, topo_sorted=False))
1291
annotated_a = self.annotate(ver_a)
1292
annotated_b = self.annotate(ver_b)
1293
return merge._plan_annotate_merge(annotated_a, annotated_b,
1294
ancestors_a, ancestors_b)
1297
class _KnitComponentFile(object):
1298
"""One of the files used to implement a knit database"""
1300
def __init__(self, transport, filename, mode, file_mode=None,
1301
create_parent_dir=False, dir_mode=None):
1302
self._transport = transport
1303
self._filename = filename
1305
self._file_mode = file_mode
1306
self._dir_mode = dir_mode
1307
self._create_parent_dir = create_parent_dir
1308
self._need_to_create = False
1310
def _full_path(self):
1311
"""Return the full path to this file."""
1312
return self._transport.base + self._filename
1314
def check_header(self, fp):
1315
line = fp.readline()
1317
# An empty file can actually be treated as though the file doesn't
1319
raise errors.NoSuchFile(self._full_path())
1320
if line != self.HEADER:
1321
raise KnitHeaderError(badline=line,
1322
filename=self._transport.abspath(self._filename))
1325
return '%s(%s)' % (self.__class__.__name__, self._filename)
1328
class _KnitIndex(_KnitComponentFile):
1329
"""Manages knit index file.
1331
The index is already kept in memory and read on startup, to enable
1332
fast lookups of revision information. The cursor of the index
1333
file is always pointing to the end, making it easy to append
1336
_cache is a cache for fast mapping from version id to a Index
1339
_history is a cache for fast mapping from indexes to version ids.
1341
The index data format is dictionary compressed when it comes to
1342
parent references; a index entry may only have parents that with a
1343
lover index number. As a result, the index is topological sorted.
1345
Duplicate entries may be written to the index for a single version id
1346
if this is done then the latter one completely replaces the former:
1347
this allows updates to correct version and parent information.
1348
Note that the two entries may share the delta, and that successive
1349
annotations and references MUST point to the first entry.
1351
The index file on disc contains a header, followed by one line per knit
1352
record. The same revision can be present in an index file more than once.
1353
The first occurrence gets assigned a sequence number starting from 0.
1355
The format of a single line is
1356
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1357
REVISION_ID is a utf8-encoded revision id
1358
FLAGS is a comma separated list of flags about the record. Values include
1359
no-eol, line-delta, fulltext.
1360
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1361
that the the compressed data starts at.
1362
LENGTH is the ascii representation of the length of the data file.
1363
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1365
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1366
revision id already in the knit that is a parent of REVISION_ID.
1367
The ' :' marker is the end of record marker.
1370
when a write is interrupted to the index file, it will result in a line
1371
that does not end in ' :'. If the ' :' is not present at the end of a line,
1372
or at the end of the file, then the record that is missing it will be
1373
ignored by the parser.
1375
When writing new records to the index file, the data is preceded by '\n'
1376
to ensure that records always start on new lines even if the last write was
1377
interrupted. As a result its normal for the last line in the index to be
1378
missing a trailing newline. One can be added with no harmful effects.
1381
HEADER = "# bzr knit index 8\n"
1383
# speed of knit parsing went from 280 ms to 280 ms with slots addition.
1384
# __slots__ = ['_cache', '_history', '_transport', '_filename']
1386
def _cache_version(self, version_id, options, pos, size, parents):
1387
"""Cache a version record in the history array and index cache.
1389
This is inlined into _load_data for performance. KEEP IN SYNC.
1390
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1393
# only want the _history index to reference the 1st index entry
1395
if version_id not in self._cache:
1396
index = len(self._history)
1397
self._history.append(version_id)
1399
index = self._cache[version_id][5]
1400
self._cache[version_id] = (version_id,
1407
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1408
create_parent_dir=False, delay_create=False, dir_mode=None):
1409
_KnitComponentFile.__init__(self, transport, filename, mode,
1410
file_mode=file_mode,
1411
create_parent_dir=create_parent_dir,
1414
# position in _history is the 'official' index for a revision
1415
# but the values may have come from a newer entry.
1416
# so - wc -l of a knit index is != the number of unique names
1420
fp = self._transport.get(self._filename)
1422
# _load_data may raise NoSuchFile if the target knit is
1424
_load_data(self, fp)
1428
if mode != 'w' or not create:
1431
self._need_to_create = True
1433
self._transport.put_bytes_non_atomic(
1434
self._filename, self.HEADER, mode=self._file_mode)
1436
def get_ancestry(self, versions, topo_sorted=True):
1437
"""See VersionedFile.get_ancestry."""
1438
# get a graph of all the mentioned versions:
1440
pending = set(versions)
1443
version = pending.pop()
1446
parents = [p for p in cache[version][4] if p in cache]
1448
raise RevisionNotPresent(version, self._filename)
1449
# if not completed and not a ghost
1450
pending.update([p for p in parents if p not in graph])
1451
graph[version] = parents
1454
return tsort.topo_sort(graph.items())
1456
def get_ancestry_with_ghosts(self, versions):
1457
"""See VersionedFile.get_ancestry_with_ghosts."""
1458
# get a graph of all the mentioned versions:
1459
self.check_versions_present(versions)
1462
pending = set(versions)
1464
version = pending.pop()
1466
parents = cache[version][4]
1472
pending.update([p for p in parents if p not in graph])
1473
graph[version] = parents
1474
return tsort.topo_sort(graph.items())
1476
def get_build_details(self, version_ids):
1477
"""Get the method, index_memo and compression parent for version_ids.
1479
Ghosts are omitted from the result.
1481
:param version_ids: An iterable of version_ids.
1482
:return: A dict of version_id:(index_memo, compression_parent,
1483
parents, record_details).
1485
opaque structure to pass to read_records to extract the raw
1488
Content that this record is built upon, may be None
1490
Logical parents of this node
1492
extra information about the content which needs to be passed to
1493
Factory.parse_record
1496
for version_id in version_ids:
1497
if version_id not in self._cache:
1498
# ghosts are omitted
1500
method = self.get_method(version_id)
1501
parents = self.get_parents_with_ghosts(version_id)
1502
if method == 'fulltext':
1503
compression_parent = None
1505
compression_parent = parents[0]
1506
noeol = 'no-eol' in self.get_options(version_id)
1507
index_memo = self.get_position(version_id)
1508
result[version_id] = (index_memo, compression_parent,
1509
parents, (method, noeol))
1512
def iter_parents(self, version_ids):
1513
"""Iterate through the parents for many version ids.
1515
:param version_ids: An iterable yielding version_ids.
1516
:return: An iterator that yields (version_id, parents). Requested
1517
version_ids not present in the versioned file are simply skipped.
1518
The order is undefined, allowing for different optimisations in
1519
the underlying implementation.
1521
parent_map = self.get_parent_map(version_ids)
1522
parent_map_set = set(parent_map)
1523
unknown_existence = set()
1524
for parents in parent_map.itervalues():
1525
unknown_existence.update(parents)
1526
unknown_existence.difference_update(parent_map_set)
1527
present_parents = set(self.get_parent_map(unknown_existence))
1528
present_parents.update(parent_map_set)
1529
for version_id, parents in parent_map.iteritems():
1530
parents = tuple(parent for parent in parents
1531
if parent in present_parents)
1532
yield version_id, parents
1534
def num_versions(self):
1535
return len(self._history)
1537
__len__ = num_versions
1539
def get_versions(self):
1540
"""Get all the versions in the file. not topologically sorted."""
1541
return self._history
1543
def _version_list_to_index(self, versions):
1546
for version in versions:
1547
if version in cache:
1548
# -- inlined lookup() --
1549
result_list.append(str(cache[version][5]))
1550
# -- end lookup () --
1552
result_list.append('.' + version)
1553
return ' '.join(result_list)
1555
def add_version(self, version_id, options, index_memo, parents):
1556
"""Add a version record to the index."""
1557
self.add_versions(((version_id, options, index_memo, parents),))
1559
def add_versions(self, versions, random_id=False):
1560
"""Add multiple versions to the index.
1562
:param versions: a list of tuples:
1563
(version_id, options, pos, size, parents).
1564
:param random_id: If True the ids being added were randomly generated
1565
and no check for existence will be performed.
1568
orig_history = self._history[:]
1569
orig_cache = self._cache.copy()
1572
for version_id, options, (index, pos, size), parents in versions:
1573
line = "\n%s %s %s %s %s :" % (version_id,
1577
self._version_list_to_index(parents))
1578
assert isinstance(line, str), \
1579
'content must be utf-8 encoded: %r' % (line,)
1581
self._cache_version(version_id, options, pos, size, tuple(parents))
1582
if not self._need_to_create:
1583
self._transport.append_bytes(self._filename, ''.join(lines))
1586
sio.write(self.HEADER)
1587
sio.writelines(lines)
1589
self._transport.put_file_non_atomic(self._filename, sio,
1590
create_parent_dir=self._create_parent_dir,
1591
mode=self._file_mode,
1592
dir_mode=self._dir_mode)
1593
self._need_to_create = False
1595
# If any problems happen, restore the original values and re-raise
1596
self._history = orig_history
1597
self._cache = orig_cache
1600
def has_version(self, version_id):
1601
"""True if the version is in the index."""
1602
return version_id in self._cache
1604
def get_position(self, version_id):
1605
"""Return details needed to access the version.
1607
.kndx indices do not support split-out data, so return None for the
1610
:return: a tuple (None, data position, size) to hand to the access
1611
logic to get the record.
1613
entry = self._cache[version_id]
1614
return None, entry[2], entry[3]
1616
def get_method(self, version_id):
1617
"""Return compression method of specified version."""
1619
options = self._cache[version_id][1]
1621
raise RevisionNotPresent(version_id, self._filename)
1622
if 'fulltext' in options:
1625
if 'line-delta' not in options:
1626
raise errors.KnitIndexUnknownMethod(self._full_path(), options)
1629
def get_options(self, version_id):
1630
"""Return a list representing options.
1634
return self._cache[version_id][1]
1636
def get_parent_map(self, version_ids):
1637
"""Passed through to by KnitVersionedFile.get_parent_map."""
1639
for version_id in version_ids:
1641
result[version_id] = tuple(self._cache[version_id][4])
1646
def get_parents_with_ghosts(self, version_id):
1647
"""Return parents of specified version with ghosts."""
1649
return self.get_parent_map([version_id])[version_id]
1651
raise RevisionNotPresent(version_id, self)
1653
def check_versions_present(self, version_ids):
1654
"""Check that all specified versions are present."""
1656
for version_id in version_ids:
1657
if version_id not in cache:
1658
raise RevisionNotPresent(version_id, self._filename)
1661
class KnitGraphIndex(object):
1662
"""A knit index that builds on GraphIndex."""
1664
def __init__(self, graph_index, deltas=False, parents=True, add_callback=None):
1665
"""Construct a KnitGraphIndex on a graph_index.
1667
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1668
:param deltas: Allow delta-compressed records.
1669
:param add_callback: If not None, allow additions to the index and call
1670
this callback with a list of added GraphIndex nodes:
1671
[(node, value, node_refs), ...]
1672
:param parents: If True, record knits parents, if not do not record
1675
self._graph_index = graph_index
1676
self._deltas = deltas
1677
self._add_callback = add_callback
1678
self._parents = parents
1679
if deltas and not parents:
1680
raise KnitCorrupt(self, "Cannot do delta compression without "
1683
def _get_entries(self, keys, check_present=False):
1684
"""Get the entries for keys.
1686
:param keys: An iterable of index keys, - 1-tuples.
1691
for node in self._graph_index.iter_entries(keys):
1693
found_keys.add(node[1])
1695
# adapt parentless index to the rest of the code.
1696
for node in self._graph_index.iter_entries(keys):
1697
yield node[0], node[1], node[2], ()
1698
found_keys.add(node[1])
1700
missing_keys = keys.difference(found_keys)
1702
raise RevisionNotPresent(missing_keys.pop(), self)
1704
def _present_keys(self, version_ids):
1706
node[1] for node in self._get_entries(version_ids)])
1708
def _parentless_ancestry(self, versions):
1709
"""Honour the get_ancestry API for parentless knit indices."""
1710
wanted_keys = self._version_ids_to_keys(versions)
1711
present_keys = self._present_keys(wanted_keys)
1712
missing = set(wanted_keys).difference(present_keys)
1714
raise RevisionNotPresent(missing.pop(), self)
1715
return list(self._keys_to_version_ids(present_keys))
1717
def get_ancestry(self, versions, topo_sorted=True):
1718
"""See VersionedFile.get_ancestry."""
1719
if not self._parents:
1720
return self._parentless_ancestry(versions)
1721
# XXX: This will do len(history) index calls - perhaps
1722
# it should be altered to be a index core feature?
1723
# get a graph of all the mentioned versions:
1726
versions = self._version_ids_to_keys(versions)
1727
pending = set(versions)
1729
# get all pending nodes
1730
this_iteration = pending
1731
new_nodes = self._get_entries(this_iteration)
1734
for (index, key, value, node_refs) in new_nodes:
1735
# dont ask for ghosties - otherwise
1736
# we we can end up looping with pending
1737
# being entirely ghosted.
1738
graph[key] = [parent for parent in node_refs[0]
1739
if parent not in ghosts]
1741
for parent in graph[key]:
1742
# dont examine known nodes again
1747
ghosts.update(this_iteration.difference(found))
1748
if versions.difference(graph):
1749
raise RevisionNotPresent(versions.difference(graph).pop(), self)
1751
result_keys = tsort.topo_sort(graph.items())
1753
result_keys = graph.iterkeys()
1754
return [key[0] for key in result_keys]
1756
def get_ancestry_with_ghosts(self, versions):
1757
"""See VersionedFile.get_ancestry."""
1758
if not self._parents:
1759
return self._parentless_ancestry(versions)
1760
# XXX: This will do len(history) index calls - perhaps
1761
# it should be altered to be a index core feature?
1762
# get a graph of all the mentioned versions:
1764
versions = self._version_ids_to_keys(versions)
1765
pending = set(versions)
1767
# get all pending nodes
1768
this_iteration = pending
1769
new_nodes = self._get_entries(this_iteration)
1771
for (index, key, value, node_refs) in new_nodes:
1772
graph[key] = node_refs[0]
1774
for parent in graph[key]:
1775
# dont examine known nodes again
1779
missing_versions = this_iteration.difference(graph)
1780
missing_needed = versions.intersection(missing_versions)
1782
raise RevisionNotPresent(missing_needed.pop(), self)
1783
for missing_version in missing_versions:
1784
# add a key, no parents
1785
graph[missing_version] = []
1786
pending.discard(missing_version) # don't look for it
1787
result_keys = tsort.topo_sort(graph.items())
1788
return [key[0] for key in result_keys]
1790
def get_build_details(self, version_ids):
1791
"""Get the method, index_memo and compression parent for version_ids.
1793
Ghosts are omitted from the result.
1795
:param version_ids: An iterable of version_ids.
1796
:return: A dict of version_id:(index_memo, compression_parent,
1797
parents, record_details).
1799
opaque structure to pass to read_records to extract the raw
1802
Content that this record is built upon, may be None
1804
Logical parents of this node
1806
extra information about the content which needs to be passed to
1807
Factory.parse_record
1810
entries = self._get_entries(self._version_ids_to_keys(version_ids), True)
1811
for entry in entries:
1812
version_id = self._keys_to_version_ids((entry[1],))[0]
1813
if not self._parents:
1816
parents = self._keys_to_version_ids(entry[3][0])
1817
if not self._deltas:
1818
compression_parent = None
1820
compression_parent_key = self._compression_parent(entry)
1821
if compression_parent_key:
1822
compression_parent = self._keys_to_version_ids(
1823
(compression_parent_key,))[0]
1825
compression_parent = None
1826
noeol = (entry[2][0] == 'N')
1827
if compression_parent:
1828
method = 'line-delta'
1831
result[version_id] = (self._node_to_position(entry),
1832
compression_parent, parents,
1836
def _compression_parent(self, an_entry):
1837
# return the key that an_entry is compressed against, or None
1838
# Grab the second parent list (as deltas implies parents currently)
1839
compression_parents = an_entry[3][1]
1840
if not compression_parents:
1842
assert len(compression_parents) == 1
1843
return compression_parents[0]
1845
def _get_method(self, node):
1846
if not self._deltas:
1848
if self._compression_parent(node):
1853
def iter_parents(self, version_ids):
1854
"""Iterate through the parents for many version ids.
1856
:param version_ids: An iterable yielding version_ids.
1857
:return: An iterator that yields (version_id, parents). Requested
1858
version_ids not present in the versioned file are simply skipped.
1859
The order is undefined, allowing for different optimisations in
1860
the underlying implementation.
1863
all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1865
present_parents = set()
1866
for node in all_nodes:
1867
all_parents.update(node[3][0])
1868
# any node we are querying must be present
1869
present_parents.add(node[1])
1870
unknown_parents = all_parents.difference(present_parents)
1871
present_parents.update(self._present_keys(unknown_parents))
1872
for node in all_nodes:
1874
for parent in node[3][0]:
1875
if parent in present_parents:
1876
parents.append(parent[0])
1877
yield node[1][0], tuple(parents)
1879
for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1880
yield node[1][0], ()
1882
def num_versions(self):
1883
return len(list(self._graph_index.iter_all_entries()))
1885
__len__ = num_versions
1887
def get_versions(self):
1888
"""Get all the versions in the file. not topologically sorted."""
1889
return [node[1][0] for node in self._graph_index.iter_all_entries()]
1891
def has_version(self, version_id):
1892
"""True if the version is in the index."""
1893
return len(self._present_keys(self._version_ids_to_keys([version_id]))) == 1
1895
def _keys_to_version_ids(self, keys):
1896
return tuple(key[0] for key in keys)
1898
def get_position(self, version_id):
1899
"""Return details needed to access the version.
1901
:return: a tuple (index, data position, size) to hand to the access
1902
logic to get the record.
1904
node = self._get_node(version_id)
1905
return self._node_to_position(node)
1907
def _node_to_position(self, node):
1908
"""Convert an index value to position details."""
1909
bits = node[2][1:].split(' ')
1910
return node[0], int(bits[0]), int(bits[1])
1912
def get_method(self, version_id):
1913
"""Return compression method of specified version."""
1914
return self._get_method(self._get_node(version_id))
1916
def _get_node(self, version_id):
1918
return list(self._get_entries(self._version_ids_to_keys([version_id])))[0]
1920
raise RevisionNotPresent(version_id, self)
1922
def get_options(self, version_id):
1923
"""Return a list representing options.
1927
node = self._get_node(version_id)
1928
options = [self._get_method(node)]
1929
if node[2][0] == 'N':
1930
options.append('no-eol')
1933
def get_parent_map(self, version_ids):
1934
"""Passed through to by KnitVersionedFile.get_parent_map."""
1935
nodes = self._get_entries(self._version_ids_to_keys(version_ids))
1939
result[node[1][0]] = self._keys_to_version_ids(node[3][0])
1942
result[node[1][0]] = ()
1945
def get_parents_with_ghosts(self, version_id):
1946
"""Return parents of specified version with ghosts."""
1948
return self.get_parent_map([version_id])[version_id]
1950
raise RevisionNotPresent(version_id, self)
1952
def check_versions_present(self, version_ids):
1953
"""Check that all specified versions are present."""
1954
keys = self._version_ids_to_keys(version_ids)
1955
present = self._present_keys(keys)
1956
missing = keys.difference(present)
1958
raise RevisionNotPresent(missing.pop(), self)
1960
def add_version(self, version_id, options, access_memo, parents):
1961
"""Add a version record to the index."""
1962
return self.add_versions(((version_id, options, access_memo, parents),))
1964
def add_versions(self, versions, random_id=False):
1965
"""Add multiple versions to the index.
1967
This function does not insert data into the Immutable GraphIndex
1968
backing the KnitGraphIndex, instead it prepares data for insertion by
1969
the caller and checks that it is safe to insert then calls
1970
self._add_callback with the prepared GraphIndex nodes.
1972
:param versions: a list of tuples:
1973
(version_id, options, pos, size, parents).
1974
:param random_id: If True the ids being added were randomly generated
1975
and no check for existence will be performed.
1977
if not self._add_callback:
1978
raise errors.ReadOnlyError(self)
1979
# we hope there are no repositories with inconsistent parentage
1984
for (version_id, options, access_memo, parents) in versions:
1985
index, pos, size = access_memo
1986
key = (version_id, )
1987
parents = tuple((parent, ) for parent in parents)
1988
if 'no-eol' in options:
1992
value += "%d %d" % (pos, size)
1993
if not self._deltas:
1994
if 'line-delta' in options:
1995
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
1998
if 'line-delta' in options:
1999
node_refs = (parents, (parents[0],))
2001
node_refs = (parents, ())
2003
node_refs = (parents, )
2006
raise KnitCorrupt(self, "attempt to add node with parents "
2007
"in parentless index.")
2009
keys[key] = (value, node_refs)
2011
present_nodes = self._get_entries(keys)
2012
for (index, key, value, node_refs) in present_nodes:
2013
if (value, node_refs) != keys[key]:
2014
raise KnitCorrupt(self, "inconsistent details in add_versions"
2015
": %s %s" % ((value, node_refs), keys[key]))
2019
for key, (value, node_refs) in keys.iteritems():
2020
result.append((key, value, node_refs))
2022
for key, (value, node_refs) in keys.iteritems():
2023
result.append((key, value))
2024
self._add_callback(result)
2026
def _version_ids_to_keys(self, version_ids):
2027
return set((version_id, ) for version_id in version_ids)
2030
class _KnitAccess(object):
2031
"""Access to knit records in a .knit file."""
2033
def __init__(self, transport, filename, _file_mode, _dir_mode,
2034
_need_to_create, _create_parent_dir):
2035
"""Create a _KnitAccess for accessing and inserting data.
2037
:param transport: The transport the .knit is located on.
2038
:param filename: The filename of the .knit.
2040
self._transport = transport
2041
self._filename = filename
2042
self._file_mode = _file_mode
2043
self._dir_mode = _dir_mode
2044
self._need_to_create = _need_to_create
2045
self._create_parent_dir = _create_parent_dir
2047
def add_raw_records(self, sizes, raw_data):
2048
"""Add raw knit bytes to a storage area.
2050
The data is spooled to whereever the access method is storing data.
2052
:param sizes: An iterable containing the size of each raw data segment.
2053
:param raw_data: A bytestring containing the data.
2054
:return: A list of memos to retrieve the record later. Each memo is a
2055
tuple - (index, pos, length), where the index field is always None
2056
for the .knit access method.
2058
assert type(raw_data) == str, \
2059
'data must be plain bytes was %s' % type(raw_data)
2060
if not self._need_to_create:
2061
base = self._transport.append_bytes(self._filename, raw_data)
2063
self._transport.put_bytes_non_atomic(self._filename, raw_data,
2064
create_parent_dir=self._create_parent_dir,
2065
mode=self._file_mode,
2066
dir_mode=self._dir_mode)
2067
self._need_to_create = False
2071
result.append((None, base, size))
2076
"""IFF this data access has its own storage area, initialise it.
2080
self._transport.put_bytes_non_atomic(self._filename, '',
2081
mode=self._file_mode)
2083
def open_file(self):
2084
"""IFF this data access can be represented as a single file, open it.
2086
For knits that are not mapped to a single file on disk this will
2089
:return: None or a file handle.
2092
return self._transport.get(self._filename)
2097
def get_raw_records(self, memos_for_retrieval):
2098
"""Get the raw bytes for a records.
2100
:param memos_for_retrieval: An iterable containing the (index, pos,
2101
length) memo for retrieving the bytes. The .knit method ignores
2102
the index as there is always only a single file.
2103
:return: An iterator over the bytes of the records.
2105
read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
2106
for pos, data in self._transport.readv(self._filename, read_vector):
2110
class _PackAccess(object):
2111
"""Access to knit records via a collection of packs."""
2113
def __init__(self, index_to_packs, writer=None):
2114
"""Create a _PackAccess object.
2116
:param index_to_packs: A dict mapping index objects to the transport
2117
and file names for obtaining data.
2118
:param writer: A tuple (pack.ContainerWriter, write_index) which
2119
contains the pack to write, and the index that reads from it will
2123
self.container_writer = writer[0]
2124
self.write_index = writer[1]
2126
self.container_writer = None
2127
self.write_index = None
2128
self.indices = index_to_packs
2130
def add_raw_records(self, sizes, raw_data):
2131
"""Add raw knit bytes to a storage area.
2133
The data is spooled to the container writer in one bytes-record per
2136
:param sizes: An iterable containing the size of each raw data segment.
2137
:param raw_data: A bytestring containing the data.
2138
:return: A list of memos to retrieve the record later. Each memo is a
2139
tuple - (index, pos, length), where the index field is the
2140
write_index object supplied to the PackAccess object.
2142
assert type(raw_data) == str, \
2143
'data must be plain bytes was %s' % type(raw_data)
2147
p_offset, p_length = self.container_writer.add_bytes_record(
2148
raw_data[offset:offset+size], [])
2150
result.append((self.write_index, p_offset, p_length))
2154
"""Pack based knits do not get individually created."""
2156
def get_raw_records(self, memos_for_retrieval):
2157
"""Get the raw bytes for a records.
2159
:param memos_for_retrieval: An iterable containing the (index, pos,
2160
length) memo for retrieving the bytes. The Pack access method
2161
looks up the pack to use for a given record in its index_to_pack
2163
:return: An iterator over the bytes of the records.
2165
# first pass, group into same-index requests
2167
current_index = None
2168
for (index, offset, length) in memos_for_retrieval:
2169
if current_index == index:
2170
current_list.append((offset, length))
2172
if current_index is not None:
2173
request_lists.append((current_index, current_list))
2174
current_index = index
2175
current_list = [(offset, length)]
2176
# handle the last entry
2177
if current_index is not None:
2178
request_lists.append((current_index, current_list))
2179
for index, offsets in request_lists:
2180
transport, path = self.indices[index]
2181
reader = pack.make_readv_reader(transport, path, offsets)
2182
for names, read_func in reader.iter_records():
2183
yield read_func(None)
2185
def open_file(self):
2186
"""Pack based knits have no single file."""
2189
def set_writer(self, writer, index, (transport, packname)):
2190
"""Set a writer to use for adding data."""
2191
if index is not None:
2192
self.indices[index] = (transport, packname)
2193
self.container_writer = writer
2194
self.write_index = index
2197
class _StreamAccess(object):
2198
"""A Knit Access object that provides data from a datastream.
2200
It also provides a fallback to present as unannotated data, annotated data
2201
from a *backing* access object.
2203
This is triggered by a index_memo which is pointing to a different index
2204
than this was constructed with, and is used to allow extracting full
2205
unannotated texts for insertion into annotated knits.
2208
def __init__(self, reader_callable, stream_index, backing_knit,
2210
"""Create a _StreamAccess object.
2212
:param reader_callable: The reader_callable from the datastream.
2213
This is called to buffer all the data immediately, for
2215
:param stream_index: The index the data stream this provides access to
2216
which will be present in native index_memo's.
2217
:param backing_knit: The knit object that will provide access to
2218
annotated texts which are not available in the stream, so as to
2219
create unannotated texts.
2220
:param orig_factory: The original content factory used to generate the
2221
stream. This is used for checking whether the thunk code for
2222
supporting _copy_texts will generate the correct form of data.
2224
self.data = reader_callable(None)
2225
self.stream_index = stream_index
2226
self.backing_knit = backing_knit
2227
self.orig_factory = orig_factory
2229
def get_raw_records(self, memos_for_retrieval):
2230
"""Get the raw bytes for a records.
2232
:param memos_for_retrieval: An iterable containing the (thunk_flag,
2233
index, start, end) memo for retrieving the bytes.
2234
:return: An iterator over the bytes of the records.
2236
# use a generator for memory friendliness
2237
for thunk_flag, version_id, start, end in memos_for_retrieval:
2238
if version_id is self.stream_index:
2239
yield self.data[start:end]
2241
# we have been asked to thunk. This thunking only occurs when
2242
# we are obtaining plain texts from an annotated backing knit
2243
# so that _copy_texts will work.
2244
# We could improve performance here by scanning for where we need
2245
# to do this and using get_line_list, then interleaving the output
2246
# as desired. However, for now, this is sufficient.
2247
if self.orig_factory.__class__ != KnitPlainFactory:
2248
raise errors.KnitCorrupt(
2249
self, 'Bad thunk request %r' % version_id)
2250
lines = self.backing_knit.get_lines(version_id)
2251
line_bytes = ''.join(lines)
2252
digest = sha_string(line_bytes)
2254
if lines[-1][-1] != '\n':
2255
lines[-1] = lines[-1] + '\n'
2257
orig_options = list(self.backing_knit._index.get_options(version_id))
2258
if 'fulltext' not in orig_options:
2259
if 'line-delta' not in orig_options:
2260
raise errors.KnitCorrupt(self,
2261
'Unknown compression method %r' % orig_options)
2262
orig_options.remove('line-delta')
2263
orig_options.append('fulltext')
2264
# We want plain data, because we expect to thunk only to allow text
2266
size, bytes = self.backing_knit._data._record_to_data(version_id,
2267
digest, lines, line_bytes)
2271
class _StreamIndex(object):
2272
"""A Knit Index object that uses the data map from a datastream."""
2274
def __init__(self, data_list, backing_index):
2275
"""Create a _StreamIndex object.
2277
:param data_list: The data_list from the datastream.
2278
:param backing_index: The index which will supply values for nodes
2279
referenced outside of this stream.
2281
self.data_list = data_list
2282
self.backing_index = backing_index
2283
self._by_version = {}
2285
for key, options, length, parents in data_list:
2286
self._by_version[key] = options, (pos, pos + length), parents
2289
def get_ancestry(self, versions, topo_sorted):
2290
"""Get an ancestry list for versions."""
2292
# Not needed for basic joins
2293
raise NotImplementedError(self.get_ancestry)
2294
# get a graph of all the mentioned versions:
2295
# Little ugly - basically copied from KnitIndex, but don't want to
2296
# accidentally incorporate too much of that index's code.
2298
pending = set(versions)
2299
cache = self._by_version
2301
version = pending.pop()
2304
parents = [p for p in cache[version][2] if p in cache]
2306
raise RevisionNotPresent(version, self)
2307
# if not completed and not a ghost
2308
pending.update([p for p in parents if p not in ancestry])
2309
ancestry.add(version)
2310
return list(ancestry)
2312
def get_build_details(self, version_ids):
2313
"""Get the method, index_memo and compression parent for version_ids.
2315
Ghosts are omitted from the result.
2317
:param version_ids: An iterable of version_ids.
2318
:return: A dict of version_id:(index_memo, compression_parent,
2319
parents, record_details).
2321
opaque structure to pass to read_records to extract the raw
2324
Content that this record is built upon, may be None
2326
Logical parents of this node
2328
extra information about the content which needs to be passed to
2329
Factory.parse_record
2332
for version_id in version_ids:
2334
method = self.get_method(version_id)
2335
except errors.RevisionNotPresent:
2336
# ghosts are omitted
2338
parent_ids = self.get_parents_with_ghosts(version_id)
2339
noeol = ('no-eol' in self.get_options(version_id))
2340
if method == 'fulltext':
2341
compression_parent = None
2343
compression_parent = parent_ids[0]
2344
index_memo = self.get_position(version_id)
2345
result[version_id] = (index_memo, compression_parent,
2346
parent_ids, (method, noeol))
2349
def get_method(self, version_id):
2350
"""Return compression method of specified version."""
2352
options = self._by_version[version_id][0]
2354
# Strictly speaking this should check in the backing knit, but
2355
# until we have a test to discriminate, this will do.
2356
return self.backing_index.get_method(version_id)
2357
if 'fulltext' in options:
2359
elif 'line-delta' in options:
2362
raise errors.KnitIndexUnknownMethod(self, options)
2364
def get_options(self, version_id):
2365
"""Return a list representing options.
2370
return self._by_version[version_id][0]
2372
return self.backing_index.get_options(version_id)
2374
def get_parent_map(self, version_ids):
2375
"""Passed through to by KnitVersionedFile.get_parent_map."""
2378
for version_id in version_ids:
2380
result[version_id] = self._by_version[version_id][2]
2382
pending_ids.add(version_id)
2383
result.update(self.backing_index.get_parent_map(pending_ids))
2386
def get_parents_with_ghosts(self, version_id):
2387
"""Return parents of specified version with ghosts."""
2389
return self.get_parent_map([version_id])[version_id]
2391
raise RevisionNotPresent(version_id, self)
2393
def get_position(self, version_id):
2394
"""Return details needed to access the version.
2396
_StreamAccess has the data as a big array, so we return slice
2397
coordinates into that (as index_memo's are opaque outside the
2398
index and matching access class).
2400
:return: a tuple (thunk_flag, index, start, end). If thunk_flag is
2401
False, index will be self, otherwise it will be a version id.
2404
start, end = self._by_version[version_id][1]
2405
return False, self, start, end
2407
# Signal to the access object to handle this from the backing knit.
2408
return (True, version_id, None, None)
2410
def get_versions(self):
2411
"""Get all the versions in the stream."""
2412
return self._by_version.keys()
2414
def iter_parents(self, version_ids):
2415
"""Iterate through the parents for many version ids.
2417
:param version_ids: An iterable yielding version_ids.
2418
:return: An iterator that yields (version_id, parents). Requested
2419
version_ids not present in the versioned file are simply skipped.
2420
The order is undefined, allowing for different optimisations in
2421
the underlying implementation.
2424
for version in version_ids:
2426
result.append((version, self._by_version[version][2]))
2432
class _KnitData(object):
2433
"""Manage extraction of data from a KnitAccess, caching and decompressing.
2435
The KnitData class provides the logic for parsing and using knit records,
2436
making use of an access method for the low level read and write operations.
2439
def __init__(self, access):
2440
"""Create a KnitData object.
2442
:param access: The access method to use. Access methods such as
2443
_KnitAccess manage the insertion of raw records and the subsequent
2444
retrieval of the same.
2446
self._access = access
2447
self._checked = False
2448
# TODO: jam 20060713 conceptually, this could spill to disk
2449
# if the cached size gets larger than a certain amount
2450
# but it complicates the model a bit, so for now just use
2451
# a simple dictionary
2453
self._do_cache = False
2455
def enable_cache(self):
2456
"""Enable caching of reads."""
2457
self._do_cache = True
2459
def clear_cache(self):
2460
"""Clear the record cache."""
2461
self._do_cache = False
2464
def _open_file(self):
2465
return self._access.open_file()
2467
def _record_to_data(self, version_id, digest, lines, dense_lines=None):
2468
"""Convert version_id, digest, lines into a raw data block.
2470
:param dense_lines: The bytes of lines but in a denser form. For
2471
instance, if lines is a list of 1000 bytestrings each ending in \n,
2472
dense_lines may be a list with one line in it, containing all the
2473
1000's lines and their \n's. Using dense_lines if it is already
2474
known is a win because the string join to create bytes in this
2475
function spends less time resizing the final string.
2476
:return: (len, a StringIO instance with the raw data ready to read.)
2478
# Note: using a string copy here increases memory pressure with e.g.
2479
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
2480
# when doing the initial commit of a mozilla tree. RBC 20070921
2481
bytes = ''.join(chain(
2482
["version %s %d %s\n" % (version_id,
2485
dense_lines or lines,
2486
["end %s\n" % version_id]))
2487
assert bytes.__class__ == str
2488
compressed_bytes = bytes_to_gzip(bytes)
2489
return len(compressed_bytes), compressed_bytes
2491
def add_raw_records(self, sizes, raw_data):
2492
"""Append a prepared record to the data file.
2494
:param sizes: An iterable containing the size of each raw data segment.
2495
:param raw_data: A bytestring containing the data.
2496
:return: a list of index data for the way the data was stored.
2497
See the access method add_raw_records documentation for more
2500
return self._access.add_raw_records(sizes, raw_data)
2502
def _parse_record_header(self, version_id, raw_data):
2503
"""Parse a record header for consistency.
2505
:return: the header and the decompressor stream.
2506
as (stream, header_record)
2508
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
2510
rec = self._check_header(version_id, df.readline())
2511
except Exception, e:
2512
raise KnitCorrupt(self._access,
2513
"While reading {%s} got %s(%s)"
2514
% (version_id, e.__class__.__name__, str(e)))
2517
def _check_header(self, version_id, line):
2520
raise KnitCorrupt(self._access,
2521
'unexpected number of elements in record header')
2522
if rec[1] != version_id:
2523
raise KnitCorrupt(self._access,
2524
'unexpected version, wanted %r, got %r'
2525
% (version_id, rec[1]))
2528
def _parse_record(self, version_id, data):
2530
# 4168 calls in 2880 217 internal
2531
# 4168 calls to _parse_record_header in 2121
2532
# 4168 calls to readlines in 330
2533
df = GzipFile(mode='rb', fileobj=StringIO(data))
2536
record_contents = df.readlines()
2537
except Exception, e:
2538
raise KnitCorrupt(self._access,
2539
"While reading {%s} got %s(%s)"
2540
% (version_id, e.__class__.__name__, str(e)))
2541
header = record_contents.pop(0)
2542
rec = self._check_header(version_id, header)
2544
last_line = record_contents.pop()
2545
if len(record_contents) != int(rec[2]):
2546
raise KnitCorrupt(self._access,
2547
'incorrect number of lines %s != %s'
2549
% (len(record_contents), int(rec[2]),
2551
if last_line != 'end %s\n' % rec[1]:
2552
raise KnitCorrupt(self._access,
2553
'unexpected version end line %r, wanted %r'
2554
% (last_line, version_id))
2556
return record_contents, rec[3]
2558
def read_records_iter_raw(self, records):
2559
"""Read text records from data file and yield raw data.
2561
This unpacks enough of the text record to validate the id is
2562
as expected but thats all.
2564
# setup an iterator of the external records:
2565
# uses readv so nice and fast we hope.
2567
# grab the disk data needed.
2569
# Don't check _cache if it is empty
2570
needed_offsets = [index_memo for version_id, index_memo
2572
if version_id not in self._cache]
2574
needed_offsets = [index_memo for version_id, index_memo
2577
raw_records = self._access.get_raw_records(needed_offsets)
2579
for version_id, index_memo in records:
2580
if version_id in self._cache:
2581
# This data has already been validated
2582
data = self._cache[version_id]
2584
data = raw_records.next()
2586
self._cache[version_id] = data
2588
# validate the header
2589
df, rec = self._parse_record_header(version_id, data)
2591
yield version_id, data
2593
def read_records_iter(self, records):
2594
"""Read text records from data file and yield result.
2596
The result will be returned in whatever is the fastest to read.
2597
Not by the order requested. Also, multiple requests for the same
2598
record will only yield 1 response.
2599
:param records: A list of (version_id, pos, len) entries
2600
:return: Yields (version_id, contents, digest) in the order
2601
read, not the order requested
2607
# Skip records we have alread seen
2608
yielded_records = set()
2609
needed_records = set()
2610
for record in records:
2611
if record[0] in self._cache:
2612
if record[0] in yielded_records:
2614
yielded_records.add(record[0])
2615
data = self._cache[record[0]]
2616
content, digest = self._parse_record(record[0], data)
2617
yield (record[0], content, digest)
2619
needed_records.add(record)
2620
needed_records = sorted(needed_records, key=operator.itemgetter(1))
2622
needed_records = sorted(set(records), key=operator.itemgetter(1))
2624
if not needed_records:
2627
# The transport optimizes the fetching as well
2628
# (ie, reads continuous ranges.)
2629
raw_data = self._access.get_raw_records(
2630
[index_memo for version_id, index_memo in needed_records])
2632
for (version_id, index_memo), data in \
2633
izip(iter(needed_records), raw_data):
2634
content, digest = self._parse_record(version_id, data)
2636
self._cache[version_id] = data
2637
yield version_id, content, digest
2639
def read_records(self, records):
2640
"""Read records into a dictionary."""
2642
for record_id, content, digest in \
2643
self.read_records_iter(records):
2644
components[record_id] = (content, digest)
2648
class InterKnit(InterVersionedFile):
2649
"""Optimised code paths for knit to knit operations."""
2651
_matching_file_from_factory = KnitVersionedFile
2652
_matching_file_to_factory = KnitVersionedFile
2655
def is_compatible(source, target):
2656
"""Be compatible with knits. """
2658
return (isinstance(source, KnitVersionedFile) and
2659
isinstance(target, KnitVersionedFile))
2660
except AttributeError:
2663
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2664
"""Copy texts to the target by extracting and adding them one by one.
2666
see join() for the parameter definitions.
2668
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2669
# --- the below is factorable out with VersionedFile.join, but wait for
2670
# VersionedFiles, it may all be simpler then.
2671
graph = Graph(self.source)
2672
search = graph._make_breadth_first_searcher(version_ids)
2673
transitive_ids = set()
2674
map(transitive_ids.update, list(search))
2675
parent_map = self.source.get_parent_map(transitive_ids)
2676
order = tsort.topo_sort(parent_map.items())
2678
def size_of_content(content):
2679
return sum(len(line) for line in content.text())
2680
# Cache at most 10MB of parent texts
2681
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2682
compute_size=size_of_content)
2683
# TODO: jam 20071116 It would be nice to have a streaming interface to
2684
# get multiple texts from a source. The source could be smarter
2685
# about how it handled intermediate stages.
2686
# get_line_list() or make_mpdiffs() seem like a possibility, but
2687
# at the moment they extract all full texts into memory, which
2688
# causes us to store more than our 3x fulltext goal.
2689
# Repository.iter_files_bytes() may be another possibility
2690
to_process = [version for version in order
2691
if version not in self.target]
2692
total = len(to_process)
2693
pb = ui.ui_factory.nested_progress_bar()
2695
for index, version in enumerate(to_process):
2696
pb.update('Converting versioned data', index, total)
2697
sha1, num_bytes, parent_text = self.target.add_lines(version,
2698
self.source.get_parents_with_ghosts(version),
2699
self.source.get_lines(version),
2700
parent_texts=parent_cache)
2701
parent_cache[version] = parent_text
2706
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2707
"""See InterVersionedFile.join."""
2708
assert isinstance(self.source, KnitVersionedFile)
2709
assert isinstance(self.target, KnitVersionedFile)
2711
# If the source and target are mismatched w.r.t. annotations vs
2712
# plain, the data needs to be converted accordingly
2713
if self.source.factory.annotated == self.target.factory.annotated:
2715
elif self.source.factory.annotated:
2716
converter = self._anno_to_plain_converter
2718
# We're converting from a plain to an annotated knit. Copy them
2719
# across by full texts.
2720
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2722
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2726
pb = ui.ui_factory.nested_progress_bar()
2728
version_ids = list(version_ids)
2729
if None in version_ids:
2730
version_ids.remove(None)
2732
self.source_ancestry = set(self.source.get_ancestry(version_ids,
2734
this_versions = set(self.target._index.get_versions())
2735
# XXX: For efficiency we should not look at the whole index,
2736
# we only need to consider the referenced revisions - they
2737
# must all be present, or the method must be full-text.
2738
# TODO, RBC 20070919
2739
needed_versions = self.source_ancestry - this_versions
2741
if not needed_versions:
2743
full_list = tsort.topo_sort(
2744
self.source.get_parent_map(self.source.versions()))
2746
version_list = [i for i in full_list if (not self.target.has_version(i)
2747
and i in needed_versions)]
2751
copy_queue_records = []
2753
for version_id in version_list:
2754
options = self.source._index.get_options(version_id)
2755
parents = self.source._index.get_parents_with_ghosts(version_id)
2756
# check that its will be a consistent copy:
2757
for parent in parents:
2758
# if source has the parent, we must :
2759
# * already have it or
2760
# * have it scheduled already
2761
# otherwise we don't care
2762
assert (self.target.has_version(parent) or
2763
parent in copy_set or
2764
not self.source.has_version(parent))
2765
index_memo = self.source._index.get_position(version_id)
2766
copy_queue_records.append((version_id, index_memo))
2767
copy_queue.append((version_id, options, parents))
2768
copy_set.add(version_id)
2770
# data suck the join:
2772
total = len(version_list)
2775
for (version_id, raw_data), \
2776
(version_id2, options, parents) in \
2777
izip(self.source._data.read_records_iter_raw(copy_queue_records),
2779
assert version_id == version_id2, 'logic error, inconsistent results'
2781
pb.update("Joining knit", count, total)
2783
size, raw_data = converter(raw_data, version_id, options,
2786
size = len(raw_data)
2787
raw_records.append((version_id, options, parents, size))
2788
raw_datum.append(raw_data)
2789
self.target._add_raw_records(raw_records, ''.join(raw_datum))
2794
def _anno_to_plain_converter(self, raw_data, version_id, options,
2796
"""Convert annotated content to plain content."""
2797
data, digest = self.source._data._parse_record(version_id, raw_data)
2798
if 'fulltext' in options:
2799
content = self.source.factory.parse_fulltext(data, version_id)
2800
lines = self.target.factory.lower_fulltext(content)
2802
delta = self.source.factory.parse_line_delta(data, version_id,
2804
lines = self.target.factory.lower_line_delta(delta)
2805
return self.target._data._record_to_data(version_id, digest, lines)
2808
InterVersionedFile.register_optimiser(InterKnit)
2811
class WeaveToKnit(InterVersionedFile):
2812
"""Optimised code paths for weave to knit operations."""
2814
_matching_file_from_factory = bzrlib.weave.WeaveFile
2815
_matching_file_to_factory = KnitVersionedFile
2818
def is_compatible(source, target):
2819
"""Be compatible with weaves to knits."""
2821
return (isinstance(source, bzrlib.weave.Weave) and
2822
isinstance(target, KnitVersionedFile))
2823
except AttributeError:
2826
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2827
"""See InterVersionedFile.join."""
2828
assert isinstance(self.source, bzrlib.weave.Weave)
2829
assert isinstance(self.target, KnitVersionedFile)
2831
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2836
pb = ui.ui_factory.nested_progress_bar()
2838
version_ids = list(version_ids)
2840
self.source_ancestry = set(self.source.get_ancestry(version_ids))
2841
this_versions = set(self.target._index.get_versions())
2842
needed_versions = self.source_ancestry - this_versions
2844
if not needed_versions:
2846
full_list = tsort.topo_sort(
2847
self.source.get_parent_map(self.source.versions()))
2849
version_list = [i for i in full_list if (not self.target.has_version(i)
2850
and i in needed_versions)]
2854
total = len(version_list)
2855
parent_map = self.source.get_parent_map(version_list)
2856
for version_id in version_list:
2857
pb.update("Converting to knit", count, total)
2858
parents = parent_map[version_id]
2859
# check that its will be a consistent copy:
2860
for parent in parents:
2861
# if source has the parent, we must already have it
2862
assert (self.target.has_version(parent))
2863
self.target.add_lines(
2864
version_id, parents, self.source.get_lines(version_id))
2871
InterVersionedFile.register_optimiser(WeaveToKnit)
2874
# Deprecated, use PatienceSequenceMatcher instead
2875
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2878
def annotate_knit(knit, revision_id):
2879
"""Annotate a knit with no cached annotations.
2881
This implementation is for knits with no cached annotations.
2882
It will work for knits with cached annotations, but this is not
2885
annotator = _KnitAnnotator(knit)
2886
return iter(annotator.annotate(revision_id))
2889
class _KnitAnnotator(object):
2890
"""Build up the annotations for a text."""
2892
def __init__(self, knit):
2895
# Content objects, differs from fulltexts because of how final newlines
2896
# are treated by knits. the content objects here will always have a
2898
self._fulltext_contents = {}
2900
# Annotated lines of specific revisions
2901
self._annotated_lines = {}
2903
# Track the raw data for nodes that we could not process yet.
2904
# This maps the revision_id of the base to a list of children that will
2905
# annotated from it.
2906
self._pending_children = {}
2908
# Nodes which cannot be extracted
2909
self._ghosts = set()
2911
# Track how many children this node has, so we know if we need to keep
2913
self._annotate_children = {}
2914
self._compression_children = {}
2916
self._all_build_details = {}
2917
# The children => parent revision_id graph
2918
self._revision_id_graph = {}
2920
self._heads_provider = None
2922
self._nodes_to_keep_annotations = set()
2923
self._generations_until_keep = 100
2925
def set_generations_until_keep(self, value):
2926
"""Set the number of generations before caching a node.
2928
Setting this to -1 will cache every merge node, setting this higher
2929
will cache fewer nodes.
2931
self._generations_until_keep = value
2933
def _add_fulltext_content(self, revision_id, content_obj):
2934
self._fulltext_contents[revision_id] = content_obj
2935
# TODO: jam 20080305 It might be good to check the sha1digest here
2936
return content_obj.text()
2938
def _check_parents(self, child, nodes_to_annotate):
2939
"""Check if all parents have been processed.
2941
:param child: A tuple of (rev_id, parents, raw_content)
2942
:param nodes_to_annotate: If child is ready, add it to
2943
nodes_to_annotate, otherwise put it back in self._pending_children
2945
for parent_id in child[1]:
2946
if (parent_id not in self._annotated_lines):
2947
# This parent is present, but another parent is missing
2948
self._pending_children.setdefault(parent_id,
2952
# This one is ready to be processed
2953
nodes_to_annotate.append(child)
2955
def _add_annotation(self, revision_id, fulltext, parent_ids,
2956
left_matching_blocks=None):
2957
"""Add an annotation entry.
2959
All parents should already have been annotated.
2960
:return: A list of children that now have their parents satisfied.
2962
a = self._annotated_lines
2963
annotated_parent_lines = [a[p] for p in parent_ids]
2964
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2965
fulltext, revision_id, left_matching_blocks,
2966
heads_provider=self._get_heads_provider()))
2967
self._annotated_lines[revision_id] = annotated_lines
2968
for p in parent_ids:
2969
ann_children = self._annotate_children[p]
2970
ann_children.remove(revision_id)
2971
if (not ann_children
2972
and p not in self._nodes_to_keep_annotations):
2973
del self._annotated_lines[p]
2974
del self._all_build_details[p]
2975
if p in self._fulltext_contents:
2976
del self._fulltext_contents[p]
2977
# Now that we've added this one, see if there are any pending
2978
# deltas to be done, certainly this parent is finished
2979
nodes_to_annotate = []
2980
for child in self._pending_children.pop(revision_id, []):
2981
self._check_parents(child, nodes_to_annotate)
2982
return nodes_to_annotate
2984
def _get_build_graph(self, revision_id):
2985
"""Get the graphs for building texts and annotations.
2987
The data you need for creating a full text may be different than the
2988
data you need to annotate that text. (At a minimum, you need both
2989
parents to create an annotation, but only need 1 parent to generate the
2992
:return: A list of (revision_id, index_memo) records, suitable for
2993
passing to read_records_iter to start reading in the raw data fro/
2996
if revision_id in self._annotated_lines:
2999
pending = set([revision_id])
3004
# get all pending nodes
3006
this_iteration = pending
3007
build_details = self._knit._index.get_build_details(this_iteration)
3008
self._all_build_details.update(build_details)
3009
# new_nodes = self._knit._index._get_entries(this_iteration)
3011
for rev_id, details in build_details.iteritems():
3012
(index_memo, compression_parent, parents,
3013
record_details) = details
3014
self._revision_id_graph[rev_id] = parents
3015
records.append((rev_id, index_memo))
3016
# Do we actually need to check _annotated_lines?
3017
pending.update(p for p in parents
3018
if p not in self._all_build_details)
3019
if compression_parent:
3020
self._compression_children.setdefault(compression_parent,
3023
for parent in parents:
3024
self._annotate_children.setdefault(parent,
3026
num_gens = generation - kept_generation
3027
if ((num_gens >= self._generations_until_keep)
3028
and len(parents) > 1):
3029
kept_generation = generation
3030
self._nodes_to_keep_annotations.add(rev_id)
3032
missing_versions = this_iteration.difference(build_details.keys())
3033
self._ghosts.update(missing_versions)
3034
for missing_version in missing_versions:
3035
# add a key, no parents
3036
self._revision_id_graph[missing_version] = ()
3037
pending.discard(missing_version) # don't look for it
3038
# XXX: This should probably be a real exception, as it is a data
3040
assert not self._ghosts.intersection(self._compression_children), \
3041
"We cannot have nodes which have a compression parent of a ghost."
3042
# Cleanout anything that depends on a ghost so that we don't wait for
3043
# the ghost to show up
3044
for node in self._ghosts:
3045
if node in self._annotate_children:
3046
# We won't be building this node
3047
del self._annotate_children[node]
3048
# Generally we will want to read the records in reverse order, because
3049
# we find the parent nodes after the children
3053
def _annotate_records(self, records):
3054
"""Build the annotations for the listed records."""
3055
# We iterate in the order read, rather than a strict order requested
3056
# However, process what we can, and put off to the side things that
3057
# still need parents, cleaning them up when those parents are
3059
for (rev_id, record,
3060
digest) in self._knit._data.read_records_iter(records):
3061
if rev_id in self._annotated_lines:
3063
parent_ids = self._revision_id_graph[rev_id]
3064
parent_ids = [p for p in parent_ids if p not in self._ghosts]
3065
details = self._all_build_details[rev_id]
3066
(index_memo, compression_parent, parents,
3067
record_details) = details
3068
nodes_to_annotate = []
3069
# TODO: Remove the punning between compression parents, and
3070
# parent_ids, we should be able to do this without assuming
3072
if len(parent_ids) == 0:
3073
# There are no parents for this node, so just add it
3074
# TODO: This probably needs to be decoupled
3075
assert compression_parent is None
3076
fulltext_content, delta = self._knit.factory.parse_record(
3077
rev_id, record, record_details, None)
3078
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3079
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3080
parent_ids, left_matching_blocks=None))
3082
child = (rev_id, parent_ids, record)
3083
# Check if all the parents are present
3084
self._check_parents(child, nodes_to_annotate)
3085
while nodes_to_annotate:
3086
# Should we use a queue here instead of a stack?
3087
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
3088
(index_memo, compression_parent, parents,
3089
record_details) = self._all_build_details[rev_id]
3090
if compression_parent is not None:
3091
comp_children = self._compression_children[compression_parent]
3092
assert rev_id in comp_children
3093
# If there is only 1 child, it is safe to reuse this
3095
reuse_content = (len(comp_children) == 1
3096
and compression_parent not in
3097
self._nodes_to_keep_annotations)
3099
# Remove it from the cache since it will be changing
3100
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3101
# Make sure to copy the fulltext since it might be
3103
parent_fulltext = list(parent_fulltext_content.text())
3105
parent_fulltext_content = self._fulltext_contents[compression_parent]
3106
parent_fulltext = parent_fulltext_content.text()
3107
comp_children.remove(rev_id)
3108
fulltext_content, delta = self._knit.factory.parse_record(
3109
rev_id, record, record_details,
3110
parent_fulltext_content,
3111
copy_base_content=(not reuse_content))
3112
fulltext = self._add_fulltext_content(rev_id,
3114
blocks = KnitContent.get_line_delta_blocks(delta,
3115
parent_fulltext, fulltext)
3117
fulltext_content = self._knit.factory.parse_fulltext(
3119
fulltext = self._add_fulltext_content(rev_id,
3122
nodes_to_annotate.extend(
3123
self._add_annotation(rev_id, fulltext, parent_ids,
3124
left_matching_blocks=blocks))
3126
def _get_heads_provider(self):
3127
"""Create a heads provider for resolving ancestry issues."""
3128
if self._heads_provider is not None:
3129
return self._heads_provider
3130
parent_provider = _mod_graph.DictParentsProvider(
3131
self._revision_id_graph)
3132
graph_obj = _mod_graph.Graph(parent_provider)
3133
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
3134
self._heads_provider = head_cache
3137
def annotate(self, revision_id):
3138
"""Return the annotated fulltext at the given revision.
3140
:param revision_id: The revision id for this file
3142
records = self._get_build_graph(revision_id)
3143
if revision_id in self._ghosts:
3144
raise errors.RevisionNotPresent(revision_id, self._knit)
3145
self._annotate_records(records)
3146
return self._annotated_lines[revision_id]
3150
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3152
from bzrlib._knit_load_data_py import _load_data_py as _load_data