67
68
from cStringIO import StringIO
69
70
from itertools import izip, chain
74
import bzrlib.errors as errors
75
82
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
76
83
InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
77
84
RevisionNotPresent, RevisionAlreadyPresent
78
from bzrlib.tuned_gzip import *
85
from bzrlib.tuned_gzip import GzipFile
79
86
from bzrlib.trace import mutter
80
87
from bzrlib.osutils import contains_whitespace, contains_linebreaks, \
89
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
90
from bzrlib.tsort import topo_sort
82
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
83
from bzrlib.tsort import topo_sort
86
95
# TODO: Split out code specific to this format into an associated object.
195
209
see parse_fulltext which this inverts.
197
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
211
encode_utf8 = cache_utf8.encode
212
return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
199
214
def lower_line_delta(self, delta):
200
215
"""convert a delta into a serializable form.
202
217
See parse_line_delta which this inverts.
219
encode_utf8 = cache_utf8.encode
205
221
for start, end, c, lines in delta:
206
222
out.append('%d,%d,%d\n' % (start, end, c))
207
for origin, text in lines:
208
out.append('%s %s' % (origin.encode('utf-8'), text))
223
out.extend(encode_utf8(origin) + ' ' + text
224
for origin, text in lines)
264
280
stored and retrieved.
267
def __init__(self, relpath, transport, file_mode=None, access_mode=None, factory=None,
268
basis_knit=None, delta=True, create=False):
283
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
284
factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
285
create=False, create_parent_dir=False, delay_create=False,
269
287
"""Construct a knit at location specified by relpath.
271
289
:param create: If not True, only open an existing knit.
290
:param create_parent_dir: If True, create the parent directory if
291
creating the file fails. (This is used for stores with
292
hash-prefixes that may not exist yet)
293
:param delay_create: The calling code is aware that the knit won't
294
actually be created until the first data is stored.
296
if deprecated_passed(basis_knit):
297
warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
298
" deprecated as of bzr 0.9.",
299
DeprecationWarning, stacklevel=2)
273
300
if access_mode is None:
274
301
access_mode = 'w'
275
302
super(KnitVersionedFile, self).__init__(access_mode)
276
303
assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
277
assert not basis_knit or isinstance(basis_knit, KnitVersionedFile), \
280
304
self.transport = transport
281
305
self.filename = relpath
282
self.basis_knit = basis_knit
283
306
self.factory = factory or KnitAnnotateFactory()
284
307
self.writable = (access_mode == 'w')
285
308
self.delta = delta
287
310
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
288
access_mode, create=create)
311
access_mode, create=create, file_mode=file_mode,
312
create_parent_dir=create_parent_dir, delay_create=delay_create,
289
314
self._data = _KnitData(transport, relpath + DATA_SUFFIX,
290
access_mode, create=not len(self.versions()))
315
access_mode, create=create and not len(self), file_mode=file_mode,
316
create_parent_dir=create_parent_dir, delay_create=delay_create,
320
return '%s(%s)' % (self.__class__.__name__,
321
self.transport.abspath(self.filename))
292
323
def _add_delta(self, version_id, parents, delta_parent, sha1, noeol, delta):
293
324
"""See VersionedFile._add_delta()."""
294
325
self._check_add(version_id, []) # should we check the lines ?
351
382
where, size = self._data.add_record(version_id, digest, store_lines)
352
383
self._index.add_version(version_id, options, where, size, parents)
385
def _add_raw_records(self, records, data):
386
"""Add all the records 'records' with data pre-joined in 'data'.
388
:param records: A list of tuples(version_id, options, parents, size).
389
:param data: The data for the records. When it is written, the records
390
are adjusted to have pos pointing into data by the sum of
391
the preceding records sizes.
394
pos = self._data.add_raw_record(data)
397
for (version_id, options, parents, size) in records:
398
index_entries.append((version_id, options, pos+offset,
400
if self._data._do_cache:
401
self._data._cache[version_id] = data[offset:offset+size]
403
self._index.add_versions(index_entries)
405
def enable_cache(self):
406
"""Start caching data for this knit"""
407
self._data.enable_cache()
354
409
def clear_cache(self):
355
410
"""Clear the data cache only."""
356
411
self._data.clear_cache()
359
414
"""See VersionedFile.copy_to()."""
360
415
# copy the current index to a temp index to avoid racing with local
362
transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename))
417
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
418
self.transport.get(self._index._filename))
363
419
# copy the data file
364
transport.put(name + DATA_SUFFIX, self._data._open_file())
365
# rename the copied index into place
366
transport.rename(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
420
f = self._data._open_file()
422
transport.put_file(name + DATA_SUFFIX, f)
425
# move the copied index into place
426
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
368
428
def create_empty(self, name, transport, mode=None):
369
return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
429
return KnitVersionedFile(name, transport, factory=self.factory,
430
delta=self.delta, create=True)
371
432
def _fix_parents(self, version, new_parents):
372
433
"""Fix the parents list for version.
484
551
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
485
552
return diff_hunks
487
def _get_components(self, version_id):
488
"""Return a list of (version_id, method, data) tuples that
489
makes up version specified by version_id of the knit.
491
The components should be applied in the order of the returned
494
The basis knit will be used to the largest extent possible
495
since it is assumed that accesses to it is faster.
554
def _get_components_positions(self, version_ids):
555
"""Produce a map of position data for the components of versions.
557
This data is intended to be used for retrieving the knit records.
559
A dict of version_id to (method, data_pos, data_size, next) is
561
method is the way referenced data should be applied.
562
data_pos is the position of the data in the knit.
563
data_size is the size of the data in the knit.
564
next is the build-parent of the version, or None for fulltexts.
498
# 4168 calls in 14912, 2289 internal
499
# 4168 in 9711 to read_records
500
# 52554 in 1250 to get_parents
501
# 170166 in 865 to list.append
503
# needed_revisions holds a list of (method, version_id) of
504
# versions that is needed to be fetched to construct the final
505
# version of the file.
507
# basis_revisions is a list of versions that needs to be
508
# fetched but exists in the basis knit.
510
basis = self.basis_knit
517
if basis and basis._index.has_version(cursor):
519
basis_versions.append(cursor)
520
method = picked_knit._index.get_method(cursor)
521
needed_versions.append((method, cursor))
522
if method == 'fulltext':
524
cursor = picked_knit.get_parents(cursor)[0]
529
for comp_id in basis_versions:
530
data_pos, data_size = basis._index.get_data_position(comp_id)
531
records.append((piece_id, data_pos, data_size))
532
components.update(basis._data.read_records(records))
535
for comp_id in [vid for method, vid in needed_versions
536
if vid not in basis_versions]:
537
data_pos, data_size = self._index.get_position(comp_id)
538
records.append((comp_id, data_pos, data_size))
539
components.update(self._data.read_records(records))
541
# get_data_records returns a mapping with the version id as
542
# index and the value as data. The order the components need
543
# to be applied is held by needed_versions (reversed).
545
for method, comp_id in reversed(needed_versions):
546
out.append((comp_id, method, components[comp_id]))
567
for version_id in version_ids:
570
while cursor is not None and cursor not in component_data:
571
method = self._index.get_method(cursor)
572
if method == 'fulltext':
575
next = self.get_parents(cursor)[0]
576
data_pos, data_size = self._index.get_position(cursor)
577
component_data[cursor] = (method, data_pos, data_size, next)
579
return component_data
550
581
def _get_content(self, version_id, parent_texts={}):
551
582
"""Returns a content object that makes up the specified
557
588
if cached_version is not None:
558
589
return cached_version
560
if self.basis_knit and version_id in self.basis_knit:
561
return self.basis_knit._get_content(version_id)
564
components = self._get_components(version_id)
565
for component_id, method, (data, digest) in components:
566
version_idx = self._index.lookup(component_id)
567
if method == 'fulltext':
568
assert content is None
569
content = self.factory.parse_fulltext(data, version_idx)
570
elif method == 'line-delta':
571
delta = self.factory.parse_line_delta(data, version_idx)
572
content._lines = self._apply_delta(content._lines, delta)
574
if 'no-eol' in self._index.get_options(version_id):
575
line = content._lines[-1][1].rstrip('\n')
576
content._lines[-1] = (content._lines[-1][0], line)
578
if sha_strings(content.text()) != digest:
579
import pdb;pdb.set_trace()
580
raise KnitCorrupt(self.filename, 'sha-1 does not match %s' % version_id)
591
text_map, contents_map = self._get_content_maps([version_id])
592
return contents_map[version_id]
584
594
def _check_versions_present(self, version_ids):
585
595
"""Check that all specified versions are present."""
693
701
def _clone_text(self, new_version_id, old_version_id, parents):
694
702
"""See VersionedFile.clone_text()."""
695
# FIXME RBC 20060228 make fast by only inserting an index with null delta.
703
# FIXME RBC 20060228 make fast by only inserting an index with null
696
705
self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
698
707
def get_lines(self, version_id):
699
708
"""See VersionedFile.get_lines()."""
700
return self._get_content(version_id).text()
702
def iter_lines_added_or_present_in_versions(self, version_ids=None):
709
return self.get_line_list([version_id])[0]
711
def _get_record_map(self, version_ids):
712
"""Produce a dictionary of knit records.
714
The keys are version_ids, the values are tuples of (method, content,
716
method is the way the content should be applied.
717
content is a KnitContent object.
718
digest is the SHA1 digest of this version id after all steps are done
719
next is the build-parent of the version, i.e. the leftmost ancestor.
720
If the method is fulltext, next will be None.
722
position_map = self._get_components_positions(version_ids)
723
# c = component_id, m = method, p = position, s = size, n = next
724
records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
726
for component_id, content, digest in \
727
self._data.read_records_iter(records):
728
method, position, size, next = position_map[component_id]
729
record_map[component_id] = method, content, digest, next
733
def get_text(self, version_id):
734
"""See VersionedFile.get_text"""
735
return self.get_texts([version_id])[0]
737
def get_texts(self, version_ids):
738
return [''.join(l) for l in self.get_line_list(version_ids)]
740
def get_line_list(self, version_ids):
741
"""Return the texts of listed versions as a list of strings."""
742
text_map, content_map = self._get_content_maps(version_ids)
743
return [text_map[v] for v in version_ids]
745
def _get_content_maps(self, version_ids):
746
"""Produce maps of text and KnitContents
748
:return: (text_map, content_map) where text_map contains the texts for
749
the requested versions and content_map contains the KnitContents.
750
Both dicts take version_ids as their keys.
752
for version_id in version_ids:
753
if not self.has_version(version_id):
754
raise RevisionNotPresent(version_id, self.filename)
755
record_map = self._get_record_map(version_ids)
760
for version_id in version_ids:
763
while cursor is not None:
764
method, data, digest, next = record_map[cursor]
765
components.append((cursor, method, data, digest))
766
if cursor in content_map:
771
for component_id, method, data, digest in reversed(components):
772
if component_id in content_map:
773
content = content_map[component_id]
775
version_idx = self._index.lookup(component_id)
776
if method == 'fulltext':
777
assert content is None
778
content = self.factory.parse_fulltext(data, version_idx)
779
elif method == 'line-delta':
780
delta = self.factory.parse_line_delta(data[:],
782
content = content.copy()
783
content._lines = self._apply_delta(content._lines,
785
content_map[component_id] = content
787
if 'no-eol' in self._index.get_options(version_id):
788
content = content.copy()
789
line = content._lines[-1][1].rstrip('\n')
790
content._lines[-1] = (content._lines[-1][0], line)
791
final_content[version_id] = content
793
# digest here is the digest from the last applied component.
794
text = content.text()
795
if sha_strings(text) != digest:
796
raise KnitCorrupt(self.filename,
797
'sha-1 does not match %s' % version_id)
799
text_map[version_id] = text
800
return text_map, final_content
802
def iter_lines_added_or_present_in_versions(self, version_ids=None,
703
804
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
704
805
if version_ids is None:
705
806
version_ids = self.versions()
706
# we dont care about inclusions, the caller cares.
808
pb = progress.DummyProgress()
809
# we don't care about inclusions, the caller cares.
707
810
# but we need to setup a list of records to visit.
708
811
# we need version_id, position, length
709
812
version_id_records = []
720
823
data_pos, length = self._index.get_position(version_id)
721
824
version_id_records.append((version_id, data_pos, length))
723
pb = bzrlib.ui.ui_factory.nested_progress_bar()
725
827
total = len(version_id_records)
828
pb.update('Walking content.', count, total)
829
for version_id, data, sha_value in \
830
self._data.read_records_iter(version_id_records):
727
831
pb.update('Walking content.', count, total)
728
for version_id, data, sha_value in \
729
self._data.read_records_iter(version_id_records):
730
pb.update('Walking content.', count, total)
731
method = self._index.get_method(version_id)
732
version_idx = self._index.lookup(version_id)
733
assert method in ('fulltext', 'line-delta')
734
if method == 'fulltext':
735
content = self.factory.parse_fulltext(data, version_idx)
736
for line in content.text():
832
method = self._index.get_method(version_id)
833
version_idx = self._index.lookup(version_id)
834
assert method in ('fulltext', 'line-delta')
835
if method == 'fulltext':
836
content = self.factory.parse_fulltext(data, version_idx)
837
for line in content.text():
840
delta = self.factory.parse_line_delta(data, version_idx)
841
for start, end, count, lines in delta:
842
for origin, line in lines:
739
delta = self.factory.parse_line_delta(data, version_idx)
740
for start, end, count, lines in delta:
741
for origin, line in lines:
744
pb.update('Walking content.', total, total)
747
pb.update('Walking content.', total, total)
845
pb.update('Walking content.', total, total)
751
847
def num_versions(self):
752
848
"""See VersionedFile.num_versions()."""
816
912
for lineno, insert_id, dset, line in w.walk(version_ids):
817
913
yield lineno, insert_id, dset, line
915
def plan_merge(self, ver_a, ver_b):
916
"""See VersionedFile.plan_merge."""
917
ancestors_b = set(self.get_ancestry(ver_b))
918
def status_a(revision, text):
919
if revision in ancestors_b:
920
return 'killed-b', text
924
ancestors_a = set(self.get_ancestry(ver_a))
925
def status_b(revision, text):
926
if revision in ancestors_a:
927
return 'killed-a', text
931
annotated_a = self.annotate(ver_a)
932
annotated_b = self.annotate(ver_b)
933
plain_a = [t for (a, t) in annotated_a]
934
plain_b = [t for (a, t) in annotated_b]
935
blocks = KnitSequenceMatcher(None, plain_a, plain_b).get_matching_blocks()
938
for ai, bi, l in blocks:
939
# process all mismatched sections
940
# (last mismatched section is handled because blocks always
941
# includes a 0-length last block)
942
for revision, text in annotated_a[a_cur:ai]:
943
yield status_a(revision, text)
944
for revision, text in annotated_b[b_cur:bi]:
945
yield status_b(revision, text)
947
# and now the matched section
950
for text_a, text_b in zip(plain_a[ai:a_cur], plain_b[bi:b_cur]):
951
assert text_a == text_b
952
yield "unchanged", text_a
820
955
class _KnitComponentFile(object):
821
956
"""One of the files used to implement a knit database"""
823
def __init__(self, transport, filename, mode):
958
def __init__(self, transport, filename, mode, file_mode=None,
959
create_parent_dir=False, dir_mode=None):
824
960
self._transport = transport
825
961
self._filename = filename
826
962
self._mode = mode
828
def write_header(self):
829
if self._transport.append(self._filename, StringIO(self.HEADER)):
830
raise KnitCorrupt(self._filename, 'misaligned after writing header')
963
self._file_mode = file_mode
964
self._dir_mode = dir_mode
965
self._create_parent_dir = create_parent_dir
966
self._need_to_create = False
832
968
def check_header(self, fp):
833
969
line = fp.readline()
923
def __init__(self, transport, filename, mode, create=False):
924
_KnitComponentFile.__init__(self, transport, filename, mode)
1059
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1060
create_parent_dir=False, delay_create=False, dir_mode=None):
1061
_KnitComponentFile.__init__(self, transport, filename, mode,
1062
file_mode=file_mode,
1063
create_parent_dir=create_parent_dir,
925
1065
self._cache = {}
926
1066
# position in _history is the 'official' index for a revision
927
1067
# but the values may have come from a newer entry.
928
# so - wc -l of a knit index is != the number of uniqe names
1068
# so - wc -l of a knit index is != the number of unique names
930
1070
self._history = []
931
1071
pb = bzrlib.ui.ui_factory.nested_progress_bar()
936
1076
pb.update('read knit index', count, total)
937
1077
fp = self._transport.get(self._filename)
938
self.check_header(fp)
939
# readlines reads the whole file at once:
940
# bad for transports like http, good for local disk
941
# we save 60 ms doing this one change (
942
# from calling readline each time to calling
944
# probably what we want for nice behaviour on
945
# http is a incremental readlines that yields, or
946
# a check for local vs non local indexes,
947
for l in fp.readlines():
949
if len(rec) < 5 or rec[-1] != ':':
951
# FIXME: in the future we should determine if its a
952
# short write - and ignore it
953
# or a different failure, and raise. RBC 20060407
957
#pb.update('read knit index', count, total)
958
# See self._parse_parents
960
for value in rec[4:-1]:
962
# uncompressed reference
963
parents.append(value[1:])
1079
self.check_header(fp)
1080
# readlines reads the whole file at once:
1081
# bad for transports like http, good for local disk
1082
# we save 60 ms doing this one change (
1083
# from calling readline each time to calling
1085
# probably what we want for nice behaviour on
1086
# http is a incremental readlines that yields, or
1087
# a check for local vs non local indexes,
1088
for l in fp.readlines():
1090
if len(rec) < 5 or rec[-1] != ':':
1092
# FIXME: in the future we should determine if its a
1093
# short write - and ignore it
1094
# or a different failure, and raise. RBC 20060407
1098
#pb.update('read knit index', count, total)
1099
# See self._parse_parents
1101
for value in rec[4:-1]:
1103
# uncompressed reference
1104
parents.append(value[1:])
1106
# this is 15/4000ms faster than isinstance,
1108
# this function is called thousands of times a
1109
# second so small variations add up.
1110
assert value.__class__ is str
1111
parents.append(self._history[int(value)])
1112
# end self._parse_parents
1113
# self._cache_version(rec[0],
1114
# rec[1].split(','),
1118
# --- self._cache_version
1119
# only want the _history index to reference the 1st
1120
# index entry for version_id
1122
if version_id not in self._cache:
1123
index = len(self._history)
1124
self._history.append(version_id)
965
# this is 15/4000ms faster than isinstance,
967
# this function is called thousands of times a
968
# second so small variations add up.
969
assert value.__class__ is str
970
parents.append(self._history[int(value)])
971
# end self._parse_parents
972
# self._cache_version(rec[0],
977
# --- self._cache_version
978
# only want the _history index to reference the 1st
979
# index entry for version_id
981
if version_id not in self._cache:
982
index = len(self._history)
983
self._history.append(version_id)
985
index = self._cache[version_id][5]
986
self._cache[version_id] = (version_id,
992
# --- self._cache_version
1126
index = self._cache[version_id][5]
1127
self._cache[version_id] = (version_id,
1133
# --- self._cache_version
993
1136
except NoSuchFile, e:
994
1137
if mode != 'w' or not create:
1140
self._need_to_create = True
1142
self._transport.put_bytes_non_atomic(self._filename,
1143
self.HEADER, mode=self._file_mode)
998
1146
pb.update('read knit index', total, total)
1089
1238
result_list.append(str(self._cache[version][5]))
1090
1239
# -- end lookup () --
1092
result_list.append('.' + version.encode('utf-8'))
1241
result_list.append('.' + encode_utf8(version))
1093
1242
return ' '.join(result_list)
1095
1244
def add_version(self, version_id, options, pos, size, parents):
1096
1245
"""Add a version record to the index."""
1097
self._cache_version(version_id, options, pos, size, parents)
1099
content = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1103
self._version_list_to_index(parents))
1104
assert isinstance(content, str), 'content must be utf-8 encoded'
1105
self._transport.append(self._filename, StringIO(content))
1246
self.add_versions(((version_id, options, pos, size, parents),))
1248
def add_versions(self, versions):
1249
"""Add multiple versions to the index.
1251
:param versions: a list of tuples:
1252
(version_id, options, pos, size, parents).
1255
encode_utf8 = cache_utf8.encode
1256
for version_id, options, pos, size, parents in versions:
1257
line = "\n%s %s %s %s %s :" % (encode_utf8(version_id),
1261
self._version_list_to_index(parents))
1262
assert isinstance(line, str), \
1263
'content must be utf-8 encoded: %r' % (line,)
1265
if not self._need_to_create:
1266
self._transport.append_bytes(self._filename, ''.join(lines))
1269
sio.write(self.HEADER)
1270
sio.writelines(lines)
1272
self._transport.put_file_non_atomic(self._filename, sio,
1273
create_parent_dir=self._create_parent_dir,
1274
mode=self._file_mode,
1275
dir_mode=self._dir_mode)
1276
self._need_to_create = False
1278
# cache after writing, so that a failed write leads to missing cache
1279
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1280
# failure, reload the index or flush it or some such, to prevent
1281
# writing records that did complete twice.
1282
for version_id, options, pos, size, parents in versions:
1283
self._cache_version(version_id, options, pos, size, parents)
1107
1285
def has_version(self, version_id):
1108
1286
"""True if the version is in the index."""
1109
return self._cache.has_key(version_id)
1287
return (version_id in self._cache)
1111
1289
def get_position(self, version_id):
1112
1290
"""Return data position and size of specified version."""
1147
1325
class _KnitData(_KnitComponentFile):
1148
1326
"""Contents of the knit data file"""
1150
HEADER = "# bzr knit data 7\n"
1152
def __init__(self, transport, filename, mode, create=False):
1153
_KnitComponentFile.__init__(self, transport, filename, mode)
1328
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1329
create_parent_dir=False, delay_create=False,
1331
_KnitComponentFile.__init__(self, transport, filename, mode,
1332
file_mode=file_mode,
1333
create_parent_dir=create_parent_dir,
1155
1335
self._checked = False
1336
# TODO: jam 20060713 conceptually, this could spill to disk
1337
# if the cached size gets larger than a certain amount
1338
# but it complicates the model a bit, so for now just use
1339
# a simple dictionary
1341
self._do_cache = False
1157
self._transport.put(self._filename, StringIO(''))
1344
self._need_to_create = create
1346
self._transport.put_bytes_non_atomic(self._filename, '',
1347
mode=self._file_mode)
1349
def enable_cache(self):
1350
"""Enable caching of reads."""
1351
self._do_cache = True
1160
1353
def clear_cache(self):
1161
1354
"""Clear the record cache."""
1355
self._do_cache = False
1164
1358
def _open_file(self):
1165
if self._file is None:
1167
self._file = self._transport.get(self._filename)
1360
return self._transport.get(self._filename)
1172
1365
def _record_to_data(self, version_id, digest, lines):
1173
1366
"""Convert version_id, digest, lines into a raw data block.
1189
1384
return length, sio
1191
1386
def add_raw_record(self, raw_data):
1192
"""Append a prepared record to the data file."""
1387
"""Append a prepared record to the data file.
1389
:return: the offset in the data file raw_data was written.
1193
1391
assert isinstance(raw_data, str), 'data must be plain bytes'
1194
start_pos = self._transport.append(self._filename, StringIO(raw_data))
1195
return start_pos, len(raw_data)
1392
if not self._need_to_create:
1393
return self._transport.append_bytes(self._filename, raw_data)
1395
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1396
create_parent_dir=self._create_parent_dir,
1397
mode=self._file_mode,
1398
dir_mode=self._dir_mode)
1399
self._need_to_create = False
1197
1402
def add_record(self, version_id, digest, lines):
1198
1403
"""Write new text record to disk. Returns the position in the
1199
1404
file where it was written."""
1200
1405
size, sio = self._record_to_data(version_id, digest, lines)
1202
self._records[version_id] = (digest, lines)
1203
1406
# write to disk
1204
start_pos = self._transport.append(self._filename, sio)
1407
if not self._need_to_create:
1408
start_pos = self._transport.append_file(self._filename, sio)
1410
self._transport.put_file_non_atomic(self._filename, sio,
1411
create_parent_dir=self._create_parent_dir,
1412
mode=self._file_mode,
1413
dir_mode=self._dir_mode)
1414
self._need_to_create = False
1417
self._cache[version_id] = sio.getvalue()
1205
1418
return start_pos, size
1207
1420
def _parse_record_header(self, version_id, raw_data):
1241
1454
This unpacks enough of the text record to validate the id is
1242
1455
as expected but thats all.
1244
It will actively recompress currently cached records on the
1245
basis that that is cheaper than I/O activity.
1248
for version_id, pos, size in records:
1249
if version_id not in self._records:
1250
needed_records.append((version_id, pos, size))
1252
1457
# setup an iterator of the external records:
1253
1458
# uses readv so nice and fast we hope.
1254
if len(needed_records):
1255
1460
# grab the disk data needed.
1256
raw_records = self._transport.readv(self._filename,
1257
[(pos, size) for version_id, pos, size in needed_records])
1462
# Don't check _cache if it is empty
1463
needed_offsets = [(pos, size) for version_id, pos, size
1465
if version_id not in self._cache]
1467
needed_offsets = [(pos, size) for version_id, pos, size
1470
raw_records = self._transport.readv(self._filename, needed_offsets)
1259
1473
for version_id, pos, size in records:
1260
if version_id in self._records:
1261
# compress a new version
1262
size, sio = self._record_to_data(version_id,
1263
self._records[version_id][0],
1264
self._records[version_id][1])
1265
yield version_id, sio.getvalue()
1474
if version_id in self._cache:
1475
# This data has already been validated
1476
data = self._cache[version_id]
1267
1478
pos, data = raw_records.next()
1480
self._cache[version_id] = data
1268
1482
# validate the header
1269
1483
df, rec = self._parse_record_header(version_id, data)
1271
yield version_id, data
1485
yield version_id, data
1274
1487
def read_records_iter(self, records):
1275
1488
"""Read text records from data file and yield result.
1277
Each passed record is a tuple of (version_id, pos, len) and
1278
will be read in the given order. Yields (version_id,
1490
The result will be returned in whatever is the fastest to read.
1491
Not by the order requested. Also, multiple requests for the same
1492
record will only yield 1 response.
1493
:param records: A list of (version_id, pos, len) entries
1494
:return: Yields (version_id, contents, digest) in the order
1495
read, not the order requested
1282
# 60890 calls for 4168 extractions in 5045, 683 internal.
1283
# 4168 calls to readv in 1411
1284
# 4168 calls to parse_record in 2880
1287
for version_id, pos, size in records:
1288
if version_id not in self._records:
1289
needed_records.append((version_id, pos, size))
1291
if len(needed_records):
1292
# We take it that the transport optimizes the fetching as good
1293
# as possible (ie, reads continous ranges.)
1294
response = self._transport.readv(self._filename,
1295
[(pos, size) for version_id, pos, size in needed_records])
1297
for (record_id, pos, size), (pos, data) in izip(iter(needed_records), response):
1298
content, digest = self._parse_record(record_id, data)
1299
self._records[record_id] = (digest, content)
1301
for version_id, pos, size in records:
1302
yield version_id, list(self._records[version_id][1]), self._records[version_id][0]
1501
# Skip records we have alread seen
1502
yielded_records = set()
1503
needed_records = set()
1504
for record in records:
1505
if record[0] in self._cache:
1506
if record[0] in yielded_records:
1508
yielded_records.add(record[0])
1509
data = self._cache[record[0]]
1510
content, digest = self._parse_record(record[0], data)
1511
yield (record[0], content, digest)
1513
needed_records.add(record)
1514
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1516
needed_records = sorted(set(records), key=operator.itemgetter(1))
1518
if not needed_records:
1521
# The transport optimizes the fetching as well
1522
# (ie, reads continuous ranges.)
1523
readv_response = self._transport.readv(self._filename,
1524
[(pos, size) for version_id, pos, size in needed_records])
1526
for (version_id, pos, size), (pos, data) in \
1527
izip(iter(needed_records), readv_response):
1528
content, digest = self._parse_record(version_id, data)
1530
self._cache[version_id] = data
1531
yield version_id, content, digest
1304
1533
def read_records(self, records):
1305
1534
"""Read records into a dictionary."""
1306
1535
components = {}
1307
for record_id, content, digest in self.read_records_iter(records):
1536
for record_id, content, digest in \
1537
self.read_records_iter(records):
1308
1538
components[record_id] = (content, digest)
1309
1539
return components
1433
1656
InterVersionedFile.register_optimiser(InterKnit)
1436
class SequenceMatcher(difflib.SequenceMatcher):
1659
class WeaveToKnit(InterVersionedFile):
1660
"""Optimised code paths for weave to knit operations."""
1662
_matching_file_from_factory = bzrlib.weave.WeaveFile
1663
_matching_file_to_factory = KnitVersionedFile
1666
def is_compatible(source, target):
1667
"""Be compatible with weaves to knits."""
1669
return (isinstance(source, bzrlib.weave.Weave) and
1670
isinstance(target, KnitVersionedFile))
1671
except AttributeError:
1674
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
1675
"""See InterVersionedFile.join."""
1676
assert isinstance(self.source, bzrlib.weave.Weave)
1677
assert isinstance(self.target, KnitVersionedFile)
1679
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
1684
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1686
version_ids = list(version_ids)
1688
self.source_ancestry = set(self.source.get_ancestry(version_ids))
1689
this_versions = set(self.target._index.get_versions())
1690
needed_versions = self.source_ancestry - this_versions
1691
cross_check_versions = self.source_ancestry.intersection(this_versions)
1692
mismatched_versions = set()
1693
for version in cross_check_versions:
1694
# scan to include needed parents.
1695
n1 = set(self.target.get_parents_with_ghosts(version))
1696
n2 = set(self.source.get_parents(version))
1697
# if all of n2's parents are in n1, then its fine.
1698
if n2.difference(n1):
1699
# FIXME TEST this check for cycles being introduced works
1700
# the logic is we have a cycle if in our graph we are an
1701
# ancestor of any of the n2 revisions.
1707
parent_ancestors = self.source.get_ancestry(parent)
1708
if version in parent_ancestors:
1709
raise errors.GraphCycleError([parent, version])
1710
# ensure this parent will be available later.
1711
new_parents = n2.difference(n1)
1712
needed_versions.update(new_parents.difference(this_versions))
1713
mismatched_versions.add(version)
1715
if not needed_versions and not mismatched_versions:
1717
full_list = topo_sort(self.source.get_graph())
1719
version_list = [i for i in full_list if (not self.target.has_version(i)
1720
and i in needed_versions)]
1724
total = len(version_list)
1725
for version_id in version_list:
1726
pb.update("Converting to knit", count, total)
1727
parents = self.source.get_parents(version_id)
1728
# check that its will be a consistent copy:
1729
for parent in parents:
1730
# if source has the parent, we must already have it
1731
assert (self.target.has_version(parent))
1732
self.target.add_lines(
1733
version_id, parents, self.source.get_lines(version_id))
1736
for version in mismatched_versions:
1737
# FIXME RBC 20060309 is this needed?
1738
n1 = set(self.target.get_parents_with_ghosts(version))
1739
n2 = set(self.source.get_parents(version))
1740
# write a combined record to our history preserving the current
1741
# parents as first in the list
1742
new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1743
self.target.fix_parents(version, new_parents)
1749
InterVersionedFile.register_optimiser(WeaveToKnit)
1752
class KnitSequenceMatcher(difflib.SequenceMatcher):
1437
1753
"""Knit tuned sequence matcher.
1439
1755
This is based on profiling of difflib which indicated some improvements