138
139
INDEX_SUFFIX = '.kndx'
142
class KnitAdapter(object):
143
"""Base class for knit record adaption."""
145
def __init__(self, basis_vf):
146
"""Create an adapter which accesses full texts from basis_vf.
148
:param basis_vf: A versioned file to access basis texts of deltas from.
149
May be None for adapters that do not need to access basis texts.
151
self._data = _KnitData(None)
152
self._annotate_factory = KnitAnnotateFactory()
153
self._plain_factory = KnitPlainFactory()
154
self._basis_vf = basis_vf
157
class FTAnnotatedToUnannotated(KnitAdapter):
158
"""An adapter from FT annotated knits to unannotated ones."""
160
def get_bytes(self, factory, annotated_compressed_bytes):
162
self._data._parse_record_unchecked(annotated_compressed_bytes)
163
content = self._annotate_factory.parse_fulltext(contents, rec[1])
164
size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
168
class DeltaAnnotatedToUnannotated(KnitAdapter):
169
"""An adapter for deltas from annotated to unannotated."""
171
def get_bytes(self, factory, annotated_compressed_bytes):
173
self._data._parse_record_unchecked(annotated_compressed_bytes)
174
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
176
contents = self._plain_factory.lower_line_delta(delta)
177
size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
181
class FTAnnotatedToFullText(KnitAdapter):
182
"""An adapter from FT annotated knits to unannotated ones."""
184
def get_bytes(self, factory, annotated_compressed_bytes):
186
self._data._parse_record_unchecked(annotated_compressed_bytes)
187
content, delta = self._annotate_factory.parse_record(factory.key[0],
188
contents, factory._build_details, None)
189
return ''.join(content.text())
192
class DeltaAnnotatedToFullText(KnitAdapter):
193
"""An adapter for deltas from annotated to unannotated."""
195
def get_bytes(self, factory, annotated_compressed_bytes):
197
self._data._parse_record_unchecked(annotated_compressed_bytes)
198
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
200
compression_parent = factory.parents[0][0]
201
basis_lines = self._basis_vf.get_lines(compression_parent)
202
# Manually apply the delta because we have one annotated content and
204
basis_content = PlainKnitContent(basis_lines, compression_parent)
205
basis_content.apply_delta(delta, rec[1])
206
basis_content._should_strip_eol = factory._build_details[1]
207
return ''.join(basis_content.text())
210
class FTPlainToFullText(KnitAdapter):
211
"""An adapter from FT plain knits to unannotated ones."""
213
def get_bytes(self, factory, compressed_bytes):
215
self._data._parse_record_unchecked(compressed_bytes)
216
content, delta = self._plain_factory.parse_record(factory.key[0],
217
contents, factory._build_details, None)
218
return ''.join(content.text())
221
class DeltaPlainToFullText(KnitAdapter):
222
"""An adapter for deltas from annotated to unannotated."""
224
def get_bytes(self, factory, compressed_bytes):
226
self._data._parse_record_unchecked(compressed_bytes)
227
delta = self._plain_factory.parse_line_delta(contents, rec[1])
228
compression_parent = factory.parents[0][0]
229
basis_lines = self._basis_vf.get_lines(compression_parent)
230
basis_content = PlainKnitContent(basis_lines, compression_parent)
231
# Manually apply the delta because we have one annotated content and
233
content, _ = self._plain_factory.parse_record(rec[1], contents,
234
factory._build_details, basis_content)
235
return ''.join(content.text())
238
class KnitContentFactory(ContentFactory):
239
"""Content factory for streaming from knits.
241
:seealso ContentFactory:
244
def __init__(self, version, parents, build_details, sha1, raw_record,
245
annotated, knit=None):
246
"""Create a KnitContentFactory for version.
248
:param version: The version.
249
:param parents: The parents.
250
:param build_details: The build details as returned from
252
:param sha1: The sha1 expected from the full text of this object.
253
:param raw_record: The bytes of the knit data from disk.
254
:param annotated: True if the raw data is annotated.
256
ContentFactory.__init__(self)
258
self.key = (version,)
259
self.parents = tuple((parent,) for parent in parents)
260
if build_details[0] == 'line-delta':
265
annotated_kind = 'annotated-'
268
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
269
self._raw_record = raw_record
270
self._build_details = build_details
273
def get_bytes_as(self, storage_kind):
274
if storage_kind == self.storage_kind:
275
return self._raw_record
276
if storage_kind == 'fulltext' and self._knit is not None:
277
return self._knit.get_text(self.key[0])
279
raise errors.UnavailableRepresentation(self.key, storage_kind,
141
283
class KnitContent(object):
142
284
"""Content of a knit version to which deltas can be applied."""
144
286
def __init__(self):
145
287
self._should_strip_eol = False
148
"""Return a list of (origin, text) tuples."""
149
return list(self.annotate_iter())
151
289
def apply_delta(self, delta, new_version_id):
152
290
"""Apply delta to this object to become new_version_id."""
153
291
raise NotImplementedError(self.apply_delta)
489
624
out.extend(lines)
492
def annotate_iter(self, knit, version_id):
627
def annotate(self, knit, version_id):
493
628
annotator = _KnitAnnotator(knit)
494
return iter(annotator.annotate(version_id))
629
return annotator.annotate(version_id)
497
632
def make_empty_knit(transport, relpath):
498
633
"""Construct a empty knit at the specified location."""
499
k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
634
k = make_file_knit(transport, relpath, 'w', KnitPlainFactory)
637
def make_file_knit(name, transport, file_mode=None, access_mode='w',
638
factory=None, delta=True, create=False, create_parent_dir=False,
639
delay_create=False, dir_mode=None, get_scope=None):
640
"""Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
642
factory = KnitAnnotateFactory()
643
if get_scope is None:
644
get_scope = lambda:None
645
index = _KnitIndex(transport, name + INDEX_SUFFIX,
646
access_mode, create=create, file_mode=file_mode,
647
create_parent_dir=create_parent_dir, delay_create=delay_create,
648
dir_mode=dir_mode, get_scope=get_scope)
649
access = _KnitAccess(transport, name + DATA_SUFFIX, file_mode,
650
dir_mode, ((create and not len(index)) and delay_create),
652
return KnitVersionedFile(name, transport, factory=factory,
653
create=create, delay_create=delay_create, index=index,
654
access_method=access)
658
"""Return the suffixes used by file based knits."""
659
return [DATA_SUFFIX, INDEX_SUFFIX]
660
make_file_knit.get_suffixes = get_suffixes
502
663
class KnitVersionedFile(VersionedFile):
527
688
actually be created until the first data is stored.
528
689
:param index: An index to use for the knit.
530
if access_mode is None:
532
super(KnitVersionedFile, self).__init__(access_mode)
533
assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
691
super(KnitVersionedFile, self).__init__()
534
692
self.transport = transport
535
693
self.filename = relpath
536
694
self.factory = factory or KnitAnnotateFactory()
537
self.writable = (access_mode == 'w')
538
695
self.delta = delta
540
697
self._max_delta_chain = 200
543
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
544
access_mode, create=create, file_mode=file_mode,
545
create_parent_dir=create_parent_dir, delay_create=delay_create,
549
if access_method is None:
550
_access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
551
((create and not len(self)) and delay_create), create_parent_dir)
553
_access = access_method
699
if None in (access_method, index):
700
raise ValueError("No default access_method or index any more")
702
_access = access_method
554
703
if create and not len(self) and not delay_create:
556
705
self._data = _KnitData(_access)
693
829
# put them in anywhere, but we hope that sending them soon
694
830
# after the fulltext will give good locality in the receiver
695
831
ready_to_send[:0] = deferred.pop(version_id)
696
assert len(deferred) == 0, \
697
"Still have compressed child versions waiting to be sent"
832
if not (len(deferred) == 0):
833
raise AssertionError("Still have compressed child versions waiting to be sent")
698
834
# XXX: The stream format is such that we cannot stream it - we have to
699
835
# know the length of all the data a-priori.
701
837
result_version_list = []
702
for (version_id, raw_data), \
838
for (version_id, raw_data, _), \
703
839
(version_id2, options, _, parents) in \
704
840
izip(self._data.read_records_iter_raw(copy_queue_records),
705
841
temp_version_list):
706
assert version_id == version_id2, \
707
'logic error, inconsistent results'
842
if not (version_id == version_id2):
843
raise AssertionError('logic error, inconsistent results')
708
844
raw_datum.append(raw_data)
709
845
result_version_list.append(
710
846
(version_id, options, len(raw_data), parents))
717
853
return pseudo_file.read(length)
718
854
return (self.get_format_signature(), result_version_list, read)
856
def get_record_stream(self, versions, ordering, include_delta_closure):
857
"""Get a stream of records for versions.
859
:param versions: The versions to include. Each version is a tuple
861
:param ordering: Either 'unordered' or 'topological'. A topologically
862
sorted stream has compression parents strictly before their
864
:param include_delta_closure: If True then the closure across any
865
compression parents will be included (in the opaque data).
866
:return: An iterator of ContentFactory objects, each of which is only
867
valid until the iterator is advanced.
869
if include_delta_closure:
870
# Nb: what we should do is plan the data to stream to allow
871
# reconstruction of all the texts without excessive buffering,
872
# including re-sending common bases as needed. This makes the most
873
# sense when we start serialising these streams though, so for now
874
# we just fallback to individual text construction behind the
875
# abstraction barrier.
879
# We end up doing multiple index lookups here for parents details and
880
# disk layout details - we need a unified api ?
881
parent_map = self.get_parent_map(versions)
882
absent_versions = set(versions) - set(parent_map)
883
if ordering == 'topological':
884
present_versions = topo_sort(parent_map)
886
# List comprehension to keep the requested order (as that seems
887
# marginally useful, at least until we start doing IO optimising
889
present_versions = [version for version in versions if version in
891
position_map = self._get_components_positions(present_versions)
892
records = [(version, position_map[version][1]) for version in
895
for version in absent_versions:
896
yield AbsentContentFactory((version,))
897
for version, raw_data, sha1 in \
898
self._data.read_records_iter_raw(records):
899
(record_details, index_memo, _) = position_map[version]
900
yield KnitContentFactory(version, parent_map[version],
901
record_details, sha1, raw_data, self.factory.annotated, knit)
720
903
def _extract_blocks(self, version_id, source, target):
721
904
if self._index.get_method(version_id) != 'line-delta':
757
940
annotated_part = "plain"
758
941
return "knit-%s" % (annotated_part,)
760
@deprecated_method(one_four)
761
def get_graph_with_ghosts(self):
762
"""See VersionedFile.get_graph_with_ghosts()."""
763
return self.get_parent_map(self.versions())
765
def get_sha1(self, version_id):
766
return self.get_sha1s([version_id])[0]
768
943
def get_sha1s(self, version_ids):
769
"""See VersionedFile.get_sha1()."""
944
"""See VersionedFile.get_sha1s()."""
770
945
record_map = self._get_record_map(version_ids)
771
946
# record entry 2 is the 'digest'.
772
947
return [record_map[v][2] for v in version_ids]
776
"""See VersionedFile.get_suffixes()."""
777
return [DATA_SUFFIX, INDEX_SUFFIX]
779
@deprecated_method(one_four)
780
def has_ghost(self, version_id):
781
"""True if there is a ghost reference in the file to version_id."""
783
if self.has_version(version_id):
785
# optimisable if needed by memoising the _ghosts set.
786
items = self.get_parent_map(self.versions())
787
for parents in items.itervalues():
788
for parent in parents:
789
if parent == version_id and parent not in items:
793
949
def insert_data_stream(self, (format, data_list, reader_callable)):
794
950
"""Insert knit records from a data stream into this knit.
851
1008
'on the source repository, and "bzr reconcile" '
852
1009
'if necessary.' %
853
1010
(version_id, parents[0]))
1012
# We received a line-delta record for a non-delta knit.
1013
# Convert it to a fulltext.
1014
gzip_bytes = reader_callable(length)
1015
self._convert_line_delta_to_fulltext(
1016
gzip_bytes, version_id, parents)
854
1019
self._add_raw_records(
855
1020
[(version_id, options, parents, length)],
856
1021
reader_callable(length))
1023
def _convert_line_delta_to_fulltext(self, gzip_bytes, version_id, parents):
1024
lines, sha1 = self._data._parse_record(version_id, gzip_bytes)
1025
delta = self.factory.parse_line_delta(lines, version_id)
1026
content = self.factory.make(self.get_lines(parents[0]), parents[0])
1027
content.apply_delta(delta, version_id)
1028
digest, len, content = self.add_lines(
1029
version_id, parents, content.text())
1031
raise errors.VersionedFileInvalidChecksum(version_id)
858
1033
def _knit_from_datastream(self, (format, data_list, reader_callable)):
859
1034
"""Create a knit object from a data stream.
877
1052
return KnitVersionedFile(self.filename, self.transport,
878
1053
factory=factory, index=index, access_method=access)
1055
def insert_record_stream(self, stream):
1056
"""Insert a record stream into this versioned file.
1058
:param stream: A stream of records to insert.
1060
:seealso VersionedFile.get_record_stream:
1062
def get_adapter(adapter_key):
1064
return adapters[adapter_key]
1066
adapter_factory = adapter_registry.get(adapter_key)
1067
adapter = adapter_factory(self)
1068
adapters[adapter_key] = adapter
1070
if self.factory.annotated:
1071
# self is annotated, we need annotated knits to use directly.
1072
annotated = "annotated-"
1075
# self is not annotated, but we can strip annotations cheaply.
1077
convertibles = set(["knit-annotated-delta-gz",
1078
"knit-annotated-ft-gz"])
1079
# The set of types we can cheaply adapt without needing basis texts.
1080
native_types = set()
1081
native_types.add("knit-%sdelta-gz" % annotated)
1082
native_types.add("knit-%sft-gz" % annotated)
1083
knit_types = native_types.union(convertibles)
1085
# Buffer all index entries that we can't add immediately because their
1086
# basis parent is missing. We don't buffer all because generating
1087
# annotations may require access to some of the new records. However we
1088
# can't generate annotations from new deltas until their basis parent
1089
# is present anyway, so we get away with not needing an index that
1090
# includes the new keys.
1091
# key = basis_parent, value = index entry to add
1092
buffered_index_entries = {}
1093
for record in stream:
1094
# Raise an error when a record is missing.
1095
if record.storage_kind == 'absent':
1096
raise RevisionNotPresent([record.key[0]], self)
1097
# adapt to non-tuple interface
1098
parents = [parent[0] for parent in record.parents]
1099
if record.storage_kind in knit_types:
1100
if record.storage_kind not in native_types:
1102
adapter_key = (record.storage_kind, "knit-delta-gz")
1103
adapter = get_adapter(adapter_key)
1105
adapter_key = (record.storage_kind, "knit-ft-gz")
1106
adapter = get_adapter(adapter_key)
1107
bytes = adapter.get_bytes(
1108
record, record.get_bytes_as(record.storage_kind))
1110
bytes = record.get_bytes_as(record.storage_kind)
1111
options = [record._build_details[0]]
1112
if record._build_details[1]:
1113
options.append('no-eol')
1114
# Just blat it across.
1115
# Note: This does end up adding data on duplicate keys. As
1116
# modern repositories use atomic insertions this should not
1117
# lead to excessive growth in the event of interrupted fetches.
1118
# 'knit' repositories may suffer excessive growth, but as a
1119
# deprecated format this is tolerable. It can be fixed if
1120
# needed by in the kndx index support raising on a duplicate
1121
# add with identical parents and options.
1122
access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
1123
index_entry = (record.key[0], options, access_memo, parents)
1125
if 'fulltext' not in options:
1126
basis_parent = parents[0]
1127
if not self.has_version(basis_parent):
1128
pending = buffered_index_entries.setdefault(
1130
pending.append(index_entry)
1133
self._index.add_versions([index_entry])
1134
elif record.storage_kind == 'fulltext':
1135
self.add_lines(record.key[0], parents,
1136
split_lines(record.get_bytes_as('fulltext')))
1138
adapter_key = record.storage_kind, 'fulltext'
1139
adapter = get_adapter(adapter_key)
1140
lines = split_lines(adapter.get_bytes(
1141
record, record.get_bytes_as(record.storage_kind)))
1143
self.add_lines(record.key[0], parents, lines)
1144
except errors.RevisionAlreadyPresent:
1146
# Add any records whose basis parent is now available.
1147
added_keys = [record.key[0]]
1149
key = added_keys.pop(0)
1150
if key in buffered_index_entries:
1151
index_entries = buffered_index_entries[key]
1152
self._index.add_versions(index_entries)
1154
[index_entry[0] for index_entry in index_entries])
1155
del buffered_index_entries[key]
1156
# If there were any deltas which had a missing basis parent, error.
1157
if buffered_index_entries:
1158
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
880
1161
def versions(self):
881
1162
"""See VersionedFile.versions."""
882
1163
if 'evil' in debug.debug_flags:
1094
1374
def check(self, progress_bar=None):
1095
1375
"""See VersionedFile.check()."""
1097
def _clone_text(self, new_version_id, old_version_id, parents):
1098
"""See VersionedFile.clone_text()."""
1099
# FIXME RBC 20060228 make fast by only inserting an index with null
1101
self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
1376
# This doesn't actually test extraction of everything, but that will
1377
# impact 'bzr check' substantially, and needs to be integrated with
1378
# care. However, it does check for the obvious problem of a delta with
1380
versions = self.versions()
1381
parent_map = self.get_parent_map(versions)
1382
for version in versions:
1383
if self._index.get_method(version) != 'fulltext':
1384
compression_parent = parent_map[version][0]
1385
if compression_parent not in parent_map:
1386
raise errors.KnitCorrupt(self,
1387
"Missing basis parent %s for %s" % (
1388
compression_parent, version))
1103
1390
def get_lines(self, version_id):
1104
1391
"""See VersionedFile.get_lines()."""
1244
1531
pb.update('Walking content.', total, total)
1246
def iter_parents(self, version_ids):
1247
"""Iterate through the parents for many version ids.
1249
:param version_ids: An iterable yielding version_ids.
1250
:return: An iterator that yields (version_id, parents). Requested
1251
version_ids not present in the versioned file are simply skipped.
1252
The order is undefined, allowing for different optimisations in
1253
the underlying implementation.
1255
return self._index.iter_parents(version_ids)
1257
1533
def num_versions(self):
1258
1534
"""See VersionedFile.num_versions()."""
1259
1535
return self._index.num_versions()
1261
1537
__len__ = num_versions
1263
def annotate_iter(self, version_id):
1264
"""See VersionedFile.annotate_iter."""
1265
return self.factory.annotate_iter(self, version_id)
1539
def annotate(self, version_id):
1540
"""See VersionedFile.annotate."""
1541
return self.factory.annotate(self, version_id)
1267
1543
def get_parent_map(self, version_ids):
1268
1544
"""See VersionedFile.get_parent_map."""
1509
1794
parents, (method, noeol))
1512
def iter_parents(self, version_ids):
1513
"""Iterate through the parents for many version ids.
1515
:param version_ids: An iterable yielding version_ids.
1516
:return: An iterator that yields (version_id, parents). Requested
1517
version_ids not present in the versioned file are simply skipped.
1518
The order is undefined, allowing for different optimisations in
1519
the underlying implementation.
1521
parent_map = self.get_parent_map(version_ids)
1522
parent_map_set = set(parent_map)
1523
unknown_existence = set()
1524
for parents in parent_map.itervalues():
1525
unknown_existence.update(parents)
1526
unknown_existence.difference_update(parent_map_set)
1527
present_parents = set(self.get_parent_map(unknown_existence))
1528
present_parents.update(parent_map_set)
1529
for version_id, parents in parent_map.iteritems():
1530
parents = tuple(parent for parent in parents
1531
if parent in present_parents)
1532
yield version_id, parents
1534
1797
def num_versions(self):
1535
1798
return len(self._history)
1851
2114
return 'fulltext'
1853
def iter_parents(self, version_ids):
1854
"""Iterate through the parents for many version ids.
1856
:param version_ids: An iterable yielding version_ids.
1857
:return: An iterator that yields (version_id, parents). Requested
1858
version_ids not present in the versioned file are simply skipped.
1859
The order is undefined, allowing for different optimisations in
1860
the underlying implementation.
1863
all_nodes = set(self._get_entries(self._version_ids_to_keys(version_ids)))
1865
present_parents = set()
1866
for node in all_nodes:
1867
all_parents.update(node[3][0])
1868
# any node we are querying must be present
1869
present_parents.add(node[1])
1870
unknown_parents = all_parents.difference(present_parents)
1871
present_parents.update(self._present_keys(unknown_parents))
1872
for node in all_nodes:
1874
for parent in node[3][0]:
1875
if parent in present_parents:
1876
parents.append(parent[0])
1877
yield node[1][0], tuple(parents)
1879
for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1880
yield node[1][0], ()
1882
2116
def num_versions(self):
1883
2117
return len(list(self._graph_index.iter_all_entries()))
2229
2459
def get_raw_records(self, memos_for_retrieval):
2230
2460
"""Get the raw bytes for a records.
2232
:param memos_for_retrieval: An iterable containing the (thunk_flag,
2233
index, start, end) memo for retrieving the bytes.
2234
:return: An iterator over the bytes of the records.
2462
:param memos_for_retrieval: An iterable of memos from the
2463
_StreamIndex object identifying bytes to read; for these classes
2464
they are (from_backing_knit, index, start, end) and can point to
2465
either the backing knit or streamed data.
2466
:return: An iterator yielding a byte string for each record in
2467
memos_for_retrieval.
2236
2469
# use a generator for memory friendliness
2237
for thunk_flag, version_id, start, end in memos_for_retrieval:
2238
if version_id is self.stream_index:
2470
for from_backing_knit, version_id, start, end in memos_for_retrieval:
2471
if not from_backing_knit:
2472
if version_id is not self.stream_index:
2473
raise AssertionError()
2239
2474
yield self.data[start:end]
2241
2476
# we have been asked to thunk. This thunking only occurs when
2246
2481
# as desired. However, for now, this is sufficient.
2247
2482
if self.orig_factory.__class__ != KnitPlainFactory:
2248
2483
raise errors.KnitCorrupt(
2249
self, 'Bad thunk request %r' % version_id)
2484
self, 'Bad thunk request %r cannot be backed by %r' %
2485
(version_id, self.orig_factory))
2250
2486
lines = self.backing_knit.get_lines(version_id)
2251
2487
line_bytes = ''.join(lines)
2252
2488
digest = sha_string(line_bytes)
2489
# the packed form of the fulltext always has a trailing newline,
2490
# even if the actual text does not, unless the file is empty. the
2491
# record options including the noeol flag are passed through by
2492
# _StreamIndex, so this is safe.
2254
2494
if lines[-1][-1] != '\n':
2255
2495
lines[-1] = lines[-1] + '\n'
2256
2496
line_bytes += '\n'
2257
orig_options = list(self.backing_knit._index.get_options(version_id))
2258
if 'fulltext' not in orig_options:
2259
if 'line-delta' not in orig_options:
2260
raise errors.KnitCorrupt(self,
2261
'Unknown compression method %r' % orig_options)
2262
orig_options.remove('line-delta')
2263
orig_options.append('fulltext')
2264
2497
# We want plain data, because we expect to thunk only to allow text
2266
2499
size, bytes = self.backing_knit._data._record_to_data(version_id,
2338
2572
parent_ids = self.get_parents_with_ghosts(version_id)
2339
2573
noeol = ('no-eol' in self.get_options(version_id))
2574
index_memo = self.get_position(version_id)
2575
from_backing_knit = index_memo[0]
2576
if from_backing_knit:
2577
# texts retrieved from the backing knit are always full texts
2340
2579
if method == 'fulltext':
2341
2580
compression_parent = None
2343
2582
compression_parent = parent_ids[0]
2344
index_memo = self.get_position(version_id)
2345
2583
result[version_id] = (index_memo, compression_parent,
2346
2584
parent_ids, (method, noeol))
2349
2587
def get_method(self, version_id):
2350
2588
"""Return compression method of specified version."""
2352
options = self._by_version[version_id][0]
2354
# Strictly speaking this should check in the backing knit, but
2355
# until we have a test to discriminate, this will do.
2356
return self.backing_index.get_method(version_id)
2589
options = self.get_options(version_id)
2357
2590
if 'fulltext' in options:
2358
2591
return 'fulltext'
2359
2592
elif 'line-delta' in options:
2514
2726
% (version_id, e.__class__.__name__, str(e)))
2517
def _check_header(self, version_id, line):
2729
def _split_header(self, line):
2518
2730
rec = line.split()
2519
2731
if len(rec) != 4:
2520
2732
raise KnitCorrupt(self._access,
2521
2733
'unexpected number of elements in record header')
2736
def _check_header_version(self, rec, version_id):
2522
2737
if rec[1] != version_id:
2523
2738
raise KnitCorrupt(self._access,
2524
2739
'unexpected version, wanted %r, got %r'
2525
2740
% (version_id, rec[1]))
2742
def _check_header(self, version_id, line):
2743
rec = self._split_header(line)
2744
self._check_header_version(rec, version_id)
2528
def _parse_record(self, version_id, data):
2747
def _parse_record_unchecked(self, data):
2529
2748
# profiling notes:
2530
2749
# 4168 calls in 2880 217 internal
2531
2750
# 4168 calls to _parse_record_header in 2121
2532
2751
# 4168 calls to readlines in 330
2533
2752
df = GzipFile(mode='rb', fileobj=StringIO(data))
2536
2754
record_contents = df.readlines()
2537
2755
except Exception, e:
2538
raise KnitCorrupt(self._access,
2539
"While reading {%s} got %s(%s)"
2540
% (version_id, e.__class__.__name__, str(e)))
2756
raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
2757
(data, e.__class__.__name__, str(e)))
2541
2758
header = record_contents.pop(0)
2542
rec = self._check_header(version_id, header)
2759
rec = self._split_header(header)
2544
2760
last_line = record_contents.pop()
2545
2761
if len(record_contents) != int(rec[2]):
2546
2762
raise KnitCorrupt(self._access,
2547
2763
'incorrect number of lines %s != %s'
2548
2764
' for version {%s}'
2549
2765
% (len(record_contents), int(rec[2]),
2551
2767
if last_line != 'end %s\n' % rec[1]:
2552
2768
raise KnitCorrupt(self._access,
2553
2769
'unexpected version end line %r, wanted %r'
2554
% (last_line, version_id))
2770
% (last_line, rec[1]))
2772
return rec, record_contents
2774
def _parse_record(self, version_id, data):
2775
rec, record_contents = self._parse_record_unchecked(data)
2776
self._check_header_version(rec, version_id)
2556
2777
return record_contents, rec[3]
2558
2779
def read_records_iter_raw(self, records):
2561
2782
This unpacks enough of the text record to validate the id is
2562
2783
as expected but thats all.
2785
Each item the iterator yields is (version_id, bytes,
2564
2788
# setup an iterator of the external records:
2565
2789
# uses readv so nice and fast we hope.
2566
2790
if len(records):
2567
2791
# grab the disk data needed.
2569
# Don't check _cache if it is empty
2570
needed_offsets = [index_memo for version_id, index_memo
2572
if version_id not in self._cache]
2574
needed_offsets = [index_memo for version_id, index_memo
2792
needed_offsets = [index_memo for version_id, index_memo
2577
2794
raw_records = self._access.get_raw_records(needed_offsets)
2579
2796
for version_id, index_memo in records:
2580
if version_id in self._cache:
2581
# This data has already been validated
2582
data = self._cache[version_id]
2584
data = raw_records.next()
2586
self._cache[version_id] = data
2588
# validate the header
2589
df, rec = self._parse_record_header(version_id, data)
2591
yield version_id, data
2797
data = raw_records.next()
2798
# validate the header
2799
df, rec = self._parse_record_header(version_id, data)
2801
yield version_id, data, rec[3]
2593
2803
def read_records_iter(self, records):
2594
2804
"""Read text records from data file and yield result.