187
195
internal representation is
188
196
(start, end, count, [1..count tuples (revid, newline)])
190
decode_utf8 = cache_utf8.decode
192
199
lines = iter(lines)
193
200
next = lines.next
203
def cache_and_return(line):
204
origin, text = line.split(' ', 1)
205
return cache.setdefault(origin, origin), text
194
207
# walk through the lines parsing.
195
208
for header in lines:
196
209
start, end, count = [int(n) for n in header.split(',')]
210
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
211
result.append((start, end, count, contents))
214
def get_fulltext_content(self, lines):
215
"""Extract just the content lines from a fulltext."""
216
return (line.split(' ', 1)[1] for line in lines)
218
def get_linedelta_content(self, lines):
219
"""Extract just the content from a line delta.
221
This doesn't return all of the extra information stored in a delta.
222
Only the actual content lines.
227
header = header.split(',')
228
count = int(header[2])
229
for i in xrange(count):
200
230
origin, text = next().split(' ', 1)
202
contents.append((decode_utf8(origin), text))
203
result.append((start, end, count, contents))
206
233
def lower_fulltext(self, content):
207
234
"""convert a fulltext content record into a serializable form.
209
236
see parse_fulltext which this inverts.
211
encode_utf8 = cache_utf8.encode
212
return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
238
# TODO: jam 20070209 We only do the caching thing to make sure that
239
# the origin is a valid utf-8 line, eventually we could remove it
240
return ['%s %s' % (o, t) for o, t in content._lines]
214
242
def lower_line_delta(self, delta):
215
243
"""convert a delta into a serializable form.
217
245
See parse_line_delta which this inverts.
219
encode_utf8 = cache_utf8.encode
247
# TODO: jam 20070209 We only do the caching thing to make sure that
248
# the origin is a valid utf-8 line, eventually we could remove it
221
250
for start, end, c, lines in delta:
222
251
out.append('%d,%d,%d\n' % (start, end, c))
223
out.extend(encode_utf8(origin) + ' ' + text
252
out.extend(origin + ' ' + text
224
253
for origin, text in lines)
231
260
annotated = False
233
def parse_fulltext(self, content, version):
262
def parse_fulltext(self, content, version_id):
234
263
"""This parses an unannotated fulltext.
236
265
Note that this is not a noop - the internal representation
237
266
has (versionid, line) - its just a constant versionid.
239
return self.make(content, version)
268
return self.make(content, version_id)
241
def parse_line_delta_iter(self, lines, version):
243
header = lines.pop(0)
270
def parse_line_delta_iter(self, lines, version_id):
272
num_lines = len(lines)
273
while cur < num_lines:
244
276
start, end, c = [int(n) for n in header.split(',')]
245
yield start, end, c, zip([version] * c, lines[:c])
248
def parse_line_delta(self, lines, version):
249
return list(self.parse_line_delta_iter(lines, version))
277
yield start, end, c, zip([version_id] * c, lines[cur:cur+c])
280
def parse_line_delta(self, lines, version_id):
281
return list(self.parse_line_delta_iter(lines, version_id))
283
def get_fulltext_content(self, lines):
284
"""Extract just the content lines from a fulltext."""
287
def get_linedelta_content(self, lines):
288
"""Extract just the content from a line delta.
290
This doesn't return all of the extra information stored in a delta.
291
Only the actual content lines.
296
header = header.split(',')
297
count = int(header[2])
298
for i in xrange(count):
251
301
def lower_fulltext(self, content):
252
302
return content.text()
320
372
return '%s(%s)' % (self.__class__.__name__,
321
373
self.transport.abspath(self.filename))
375
def _check_should_delta(self, first_parents):
376
"""Iterate back through the parent listing, looking for a fulltext.
378
This is used when we want to decide whether to add a delta or a new
379
fulltext. It searches for _max_delta_chain parents. When it finds a
380
fulltext parent, it sees if the total size of the deltas leading up to
381
it is large enough to indicate that we want a new full text anyway.
383
Return True if we should create a new delta, False if we should use a
388
delta_parents = first_parents
389
for count in xrange(self._max_delta_chain):
390
parent = delta_parents[0]
391
method = self._index.get_method(parent)
392
pos, size = self._index.get_position(parent)
393
if method == 'fulltext':
397
delta_parents = self._index.get_parents(parent)
399
# We couldn't find a fulltext, so we must create a new one
402
return fulltext_size > delta_size
323
404
def _add_delta(self, version_id, parents, delta_parent, sha1, noeol, delta):
324
405
"""See VersionedFile._add_delta()."""
325
406
self._check_add(version_id, []) # should we check the lines ?
804
872
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
805
873
if version_ids is None:
806
874
version_ids = self.versions()
876
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
808
878
pb = progress.DummyProgress()
809
879
# we don't care about inclusions, the caller cares.
810
880
# but we need to setup a list of records to visit.
811
881
# we need version_id, position, length
812
882
version_id_records = []
813
requested_versions = list(version_ids)
883
requested_versions = set(version_ids)
814
884
# filter for available versions
815
885
for version_id in requested_versions:
816
886
if not self.has_version(version_id):
817
887
raise RevisionNotPresent(version_id, self.filename)
818
888
# get a in-component-order queue:
820
889
for version_id in self.versions():
821
890
if version_id in requested_versions:
822
version_ids.append(version_id)
823
891
data_pos, length = self._index.get_position(version_id)
824
892
version_id_records.append((version_id, data_pos, length))
827
894
total = len(version_id_records)
828
pb.update('Walking content.', count, total)
829
for version_id, data, sha_value in \
830
self._data.read_records_iter(version_id_records):
831
pb.update('Walking content.', count, total)
895
for version_idx, (version_id, data, sha_value) in \
896
enumerate(self._data.read_records_iter(version_id_records)):
897
pb.update('Walking content.', version_idx, total)
832
898
method = self._index.get_method(version_id)
833
version_idx = self._index.lookup(version_id)
834
900
assert method in ('fulltext', 'line-delta')
835
901
if method == 'fulltext':
836
content = self.factory.parse_fulltext(data, version_idx)
837
for line in content.text():
902
line_iterator = self.factory.get_fulltext_content(data)
840
delta = self.factory.parse_line_delta(data, version_idx)
841
for start, end, count, lines in delta:
842
for origin, line in lines:
904
line_iterator = self.factory.get_linedelta_content(data)
905
for line in line_iterator:
845
908
pb.update('Walking content.', total, total)
847
910
def num_versions(self):
1068
1145
# so - wc -l of a knit index is != the number of unique names
1070
1147
self._history = []
1071
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1076
pb.update('read knit index', count, total)
1077
fp = self._transport.get(self._filename)
1079
self.check_header(fp)
1080
# readlines reads the whole file at once:
1081
# bad for transports like http, good for local disk
1082
# we save 60 ms doing this one change (
1083
# from calling readline each time to calling
1085
# probably what we want for nice behaviour on
1086
# http is a incremental readlines that yields, or
1087
# a check for local vs non local indexes,
1088
for l in fp.readlines():
1090
if len(rec) < 5 or rec[-1] != ':':
1092
# FIXME: in the future we should determine if its a
1093
# short write - and ignore it
1094
# or a different failure, and raise. RBC 20060407
1098
#pb.update('read knit index', count, total)
1099
# See self._parse_parents
1101
for value in rec[4:-1]:
1103
# uncompressed reference
1104
parents.append(value[1:])
1106
# this is 15/4000ms faster than isinstance,
1108
# this function is called thousands of times a
1109
# second so small variations add up.
1110
assert value.__class__ is str
1111
parents.append(self._history[int(value)])
1112
# end self._parse_parents
1113
# self._cache_version(rec[0],
1114
# rec[1].split(','),
1118
# --- self._cache_version
1119
# only want the _history index to reference the 1st
1120
# index entry for version_id
1122
if version_id not in self._cache:
1123
index = len(self._history)
1124
self._history.append(version_id)
1126
index = self._cache[version_id][5]
1127
self._cache[version_id] = (version_id,
1133
# --- self._cache_version
1136
except NoSuchFile, e:
1137
if mode != 'w' or not create:
1140
self._need_to_create = True
1142
self._transport.put_bytes_non_atomic(self._filename,
1143
self.HEADER, mode=self._file_mode)
1146
pb.update('read knit index', total, total)
1149
def _parse_parents(self, compressed_parents):
1150
"""convert a list of string parent values into version ids.
1152
ints are looked up in the index.
1153
.FOO values are ghosts and converted in to FOO.
1155
NOTE: the function is retained here for clarity, and for possible
1156
use in partial index reads. However bulk processing now has
1157
it inlined in __init__ for inner-loop optimisation.
1160
for value in compressed_parents:
1161
if value[-1] == '.':
1162
# uncompressed reference
1163
result.append(value[1:])
1165
# this is 15/4000ms faster than isinstance,
1166
# this function is called thousands of times a
1167
# second so small variations add up.
1168
assert value.__class__ is str
1169
result.append(self._history[int(value)])
1149
fp = self._transport.get(self._filename)
1151
# _load_data may raise NoSuchFile if the target knit is
1157
if mode != 'w' or not create:
1160
self._need_to_create = True
1162
self._transport.put_bytes_non_atomic(
1163
self._filename, self.HEADER, mode=self._file_mode)
1165
def _load_data(self, fp):
1167
history = self._history
1169
self.check_header(fp)
1170
# readlines reads the whole file at once:
1171
# bad for transports like http, good for local disk
1172
# we save 60 ms doing this one change (
1173
# from calling readline each time to calling
1175
# probably what we want for nice behaviour on
1176
# http is a incremental readlines that yields, or
1177
# a check for local vs non local indexes,
1178
history_top = len(history) - 1
1179
for line in fp.readlines():
1181
if len(rec) < 5 or rec[-1] != ':':
1183
# FIXME: in the future we should determine if its a
1184
# short write - and ignore it
1185
# or a different failure, and raise. RBC 20060407
1190
for value in rec[4:-1]:
1192
# uncompressed reference
1193
parent_id = value[1:]
1195
parent_id = history[int(value)]
1196
parents.append(parent_id)
1197
except (IndexError, ValueError), e:
1198
# The parent could not be decoded to get its parent row. This
1199
# at a minimum will cause this row to have wrong parents, or
1200
# even to apply a delta to the wrong base and decode
1201
# incorrectly. its therefore not usable, and because we have
1202
# encountered a situation where a new knit index had this
1203
# corrupt we can't asssume that no other rows referring to the
1204
# index of this record actually mean the subsequent uncorrupt
1206
raise errors.KnitCorrupt(self._filename,
1207
"line %r: %s" % (rec, e))
1209
version_id, options, pos, size = rec[:4]
1210
version_id = version_id
1212
# See self._cache_version
1213
# only want the _history index to reference the 1st
1214
# index entry for version_id
1215
if version_id not in cache:
1218
history.append(version_id)
1220
index = cache[version_id][5]
1221
cache[version_id] = (version_id,
1227
# end self._cache_version
1172
1229
def get_graph(self):
1174
for version_id, index in self._cache.iteritems():
1175
graph.append((version_id, index[4]))
1230
return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
1178
def get_ancestry(self, versions):
1232
def get_ancestry(self, versions, topo_sorted=True):
1179
1233
"""See VersionedFile.get_ancestry."""
1180
1234
# get a graph of all the mentioned versions:
1182
1236
pending = set(versions)
1184
1239
version = pending.pop()
1185
parents = self._cache[version][4]
1186
# got the parents ok
1188
parents = [parent for parent in parents if parent in self._cache]
1189
for parent in parents:
1190
# if not completed and not a ghost
1191
if parent not in graph:
1242
parents = [p for p in cache[version][4] if p in cache]
1244
raise RevisionNotPresent(version, self._filename)
1245
# if not completed and not a ghost
1246
pending.update([p for p in parents if p not in graph])
1193
1247
graph[version] = parents
1194
1250
return topo_sort(graph.items())
1196
1252
def get_ancestry_with_ghosts(self, versions):
1197
1253
"""See VersionedFile.get_ancestry_with_ghosts."""
1198
1254
# get a graph of all the mentioned versions:
1255
self.check_versions_present(versions)
1200
1258
pending = set(versions)
1202
1260
version = pending.pop()
1204
parents = self._cache[version][4]
1262
parents = cache[version][4]
1205
1263
except KeyError:
1206
1264
# ghost, fake it
1207
1265
graph[version] = []
1210
# got the parents ok
1211
for parent in parents:
1212
if parent not in graph:
1268
pending.update([p for p in parents if p not in graph])
1214
1269
graph[version] = parents
1215
1270
return topo_sort(graph.items())
1252
1307
(version_id, options, pos, size, parents).
1255
encode_utf8 = cache_utf8.encode
1256
for version_id, options, pos, size, parents in versions:
1257
line = "\n%s %s %s %s %s :" % (encode_utf8(version_id),
1261
self._version_list_to_index(parents))
1262
assert isinstance(line, str), \
1263
'content must be utf-8 encoded: %r' % (line,)
1265
if not self._need_to_create:
1266
self._transport.append_bytes(self._filename, ''.join(lines))
1269
sio.write(self.HEADER)
1270
sio.writelines(lines)
1272
self._transport.put_file_non_atomic(self._filename, sio,
1273
create_parent_dir=self._create_parent_dir,
1274
mode=self._file_mode,
1275
dir_mode=self._dir_mode)
1276
self._need_to_create = False
1278
# cache after writing, so that a failed write leads to missing cache
1279
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1280
# failure, reload the index or flush it or some such, to prevent
1281
# writing records that did complete twice.
1282
for version_id, options, pos, size, parents in versions:
1283
self._cache_version(version_id, options, pos, size, parents)
1310
orig_history = self._history[:]
1311
orig_cache = self._cache.copy()
1314
for version_id, options, pos, size, parents in versions:
1315
line = "\n%s %s %s %s %s :" % (version_id,
1319
self._version_list_to_index(parents))
1320
assert isinstance(line, str), \
1321
'content must be utf-8 encoded: %r' % (line,)
1323
self._cache_version(version_id, options, pos, size, parents)
1324
if not self._need_to_create:
1325
self._transport.append_bytes(self._filename, ''.join(lines))
1328
sio.write(self.HEADER)
1329
sio.writelines(lines)
1331
self._transport.put_file_non_atomic(self._filename, sio,
1332
create_parent_dir=self._create_parent_dir,
1333
mode=self._file_mode,
1334
dir_mode=self._dir_mode)
1335
self._need_to_create = False
1337
# If any problems happen, restore the original values and re-raise
1338
self._history = orig_history
1339
self._cache = orig_cache
1285
1342
def has_version(self, version_id):
1286
1343
"""True if the version is in the index."""
1287
return (version_id in self._cache)
1344
return version_id in self._cache
1289
1346
def get_position(self, version_id):
1290
1347
"""Return data position and size of specified version."""
1291
return (self._cache[version_id][2], \
1292
self._cache[version_id][3])
1348
entry = self._cache[version_id]
1349
return entry[2], entry[3]
1294
1351
def get_method(self, version_id):
1295
1352
"""Return compression method of specified version."""
1424
1480
as (stream, header_record)
1426
1482
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
1427
rec = df.readline().split()
1484
rec = self._check_header(version_id, df.readline())
1485
except Exception, e:
1486
raise KnitCorrupt(self._filename,
1487
"While reading {%s} got %s(%s)"
1488
% (version_id, e.__class__.__name__, str(e)))
1491
def _check_header(self, version_id, line):
1428
1493
if len(rec) != 4:
1429
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1430
if cache_utf8.decode(rec[1]) != version_id:
1431
raise KnitCorrupt(self._filename,
1432
'unexpected version, wanted %r, got %r' % (
1433
version_id, rec[1]))
1494
raise KnitCorrupt(self._filename,
1495
'unexpected number of elements in record header')
1496
if rec[1] != version_id:
1497
raise KnitCorrupt(self._filename,
1498
'unexpected version, wanted %r, got %r'
1499
% (version_id, rec[1]))
1436
1502
def _parse_record(self, version_id, data):
1437
1503
# profiling notes:
1438
1504
# 4168 calls in 2880 217 internal
1439
1505
# 4168 calls to _parse_record_header in 2121
1440
1506
# 4168 calls to readlines in 330
1441
df, rec = self._parse_record_header(version_id, data)
1442
record_contents = df.readlines()
1443
l = record_contents.pop()
1444
assert len(record_contents) == int(rec[2])
1445
if l != 'end %s\n' % cache_utf8.encode(version_id):
1446
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
1507
df = GzipFile(mode='rb', fileobj=StringIO(data))
1510
record_contents = df.readlines()
1511
except Exception, e:
1512
raise KnitCorrupt(self._filename,
1513
"While reading {%s} got %s(%s)"
1514
% (version_id, e.__class__.__name__, str(e)))
1515
header = record_contents.pop(0)
1516
rec = self._check_header(version_id, header)
1518
last_line = record_contents.pop()
1519
if len(record_contents) != int(rec[2]):
1520
raise KnitCorrupt(self._filename,
1521
'incorrect number of lines %s != %s'
1523
% (len(record_contents), int(rec[2]),
1525
if last_line != 'end %s\n' % rec[1]:
1526
raise KnitCorrupt(self._filename,
1527
'unexpected version end line %r, wanted %r'
1528
% (last_line, version_id))
1449
1530
return record_contents, rec[3]