1045
1057
pb.update('read knit index', count, total)
1046
1058
fp = self._transport.get(self._filename)
1047
self.check_header(fp)
1048
# readlines reads the whole file at once:
1049
# bad for transports like http, good for local disk
1050
# we save 60 ms doing this one change (
1051
# from calling readline each time to calling
1053
# probably what we want for nice behaviour on
1054
# http is a incremental readlines that yields, or
1055
# a check for local vs non local indexes,
1056
for l in fp.readlines():
1058
if len(rec) < 5 or rec[-1] != ':':
1060
# FIXME: in the future we should determine if its a
1061
# short write - and ignore it
1062
# or a different failure, and raise. RBC 20060407
1066
#pb.update('read knit index', count, total)
1067
# See self._parse_parents
1069
for value in rec[4:-1]:
1071
# uncompressed reference
1072
parents.append(value[1:])
1060
self.check_header(fp)
1061
# readlines reads the whole file at once:
1062
# bad for transports like http, good for local disk
1063
# we save 60 ms doing this one change (
1064
# from calling readline each time to calling
1066
# probably what we want for nice behaviour on
1067
# http is a incremental readlines that yields, or
1068
# a check for local vs non local indexes,
1069
for l in fp.readlines():
1071
if len(rec) < 5 or rec[-1] != ':':
1073
# FIXME: in the future we should determine if its a
1074
# short write - and ignore it
1075
# or a different failure, and raise. RBC 20060407
1079
#pb.update('read knit index', count, total)
1080
# See self._parse_parents
1082
for value in rec[4:-1]:
1084
# uncompressed reference
1085
parents.append(value[1:])
1087
# this is 15/4000ms faster than isinstance,
1089
# this function is called thousands of times a
1090
# second so small variations add up.
1091
assert value.__class__ is str
1092
parents.append(self._history[int(value)])
1093
# end self._parse_parents
1094
# self._cache_version(rec[0],
1095
# rec[1].split(','),
1099
# --- self._cache_version
1100
# only want the _history index to reference the 1st
1101
# index entry for version_id
1103
if version_id not in self._cache:
1104
index = len(self._history)
1105
self._history.append(version_id)
1074
# this is 15/4000ms faster than isinstance,
1076
# this function is called thousands of times a
1077
# second so small variations add up.
1078
assert value.__class__ is str
1079
parents.append(self._history[int(value)])
1080
# end self._parse_parents
1081
# self._cache_version(rec[0],
1082
# rec[1].split(','),
1086
# --- self._cache_version
1087
# only want the _history index to reference the 1st
1088
# index entry for version_id
1090
if version_id not in self._cache:
1091
index = len(self._history)
1092
self._history.append(version_id)
1094
index = self._cache[version_id][5]
1095
self._cache[version_id] = (version_id,
1101
# --- self._cache_version
1107
index = self._cache[version_id][5]
1108
self._cache[version_id] = (version_id,
1114
# --- self._cache_version
1102
1117
except NoSuchFile, e:
1103
1118
if mode != 'w' or not create:
1277
1292
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1278
1293
_KnitComponentFile.__init__(self, transport, filename, mode)
1280
1294
self._checked = False
1295
# TODO: jam 20060713 conceptually, this could spill to disk
1296
# if the cached size gets larger than a certain amount
1297
# but it complicates the model a bit, so for now just use
1298
# a simple dictionary
1300
self._do_cache = False
1282
1302
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1304
def enable_cache(self):
1305
"""Enable caching of reads."""
1306
self._do_cache = True
1284
1308
def clear_cache(self):
1285
1309
"""Clear the record cache."""
1310
self._do_cache = False
1288
1313
def _open_file(self):
1289
if self._file is None:
1291
self._file = self._transport.get(self._filename)
1315
return self._transport.get(self._filename)
1296
1320
def _record_to_data(self, version_id, digest, lines):
1297
1321
"""Convert version_id, digest, lines into a raw data block.
1365
1391
This unpacks enough of the text record to validate the id is
1366
1392
as expected but thats all.
1368
It will actively recompress currently cached records on the
1369
basis that that is cheaper than I/O activity.
1371
1394
# setup an iterator of the external records:
1372
1395
# uses readv so nice and fast we hope.
1373
1396
if len(records):
1374
1397
# grab the disk data needed.
1375
raw_records = self._transport.readv(self._filename,
1376
[(pos, size) for version_id, pos, size in records])
1399
# Don't check _cache if it is empty
1400
needed_offsets = [(pos, size) for version_id, pos, size
1402
if version_id not in self._cache]
1404
needed_offsets = [(pos, size) for version_id, pos, size
1407
raw_records = self._transport.readv(self._filename, needed_offsets)
1378
1410
for version_id, pos, size in records:
1379
pos, data = raw_records.next()
1380
# validate the header
1381
df, rec = self._parse_record_header(version_id, data)
1411
if version_id in self._cache:
1412
# This data has already been validated
1413
data = self._cache[version_id]
1415
pos, data = raw_records.next()
1417
self._cache[version_id] = data
1419
# validate the header
1420
df, rec = self._parse_record_header(version_id, data)
1383
1422
yield version_id, data
1385
1424
def read_records_iter(self, records):
1386
1425
"""Read text records from data file and yield result.
1388
Each passed record is a tuple of (version_id, pos, len) and
1389
will be read in the given order. Yields (version_id,
1427
The result will be returned in whatever is the fastest to read.
1428
Not by the order requested. Also, multiple requests for the same
1429
record will only yield 1 response.
1430
:param records: A list of (version_id, pos, len) entries
1431
:return: Yields (version_id, contents, digest) in the order
1432
read, not the order requested
1392
if len(records) == 0:
1395
# 60890 calls for 4168 extractions in 5045, 683 internal.
1396
# 4168 calls to readv in 1411
1397
# 4168 calls to parse_record in 2880
1399
# Get unique records, sorted by position
1400
needed_records = sorted(set(records), key=operator.itemgetter(1))
1402
# We take it that the transport optimizes the fetching as good
1403
# as possible (ie, reads continuous ranges.)
1404
response = self._transport.readv(self._filename,
1438
# Skip records we have alread seen
1439
yielded_records = set()
1440
needed_records = set()
1441
for record in records:
1442
if record[0] in self._cache:
1443
if record[0] in yielded_records:
1445
yielded_records.add(record[0])
1446
data = self._cache[record[0]]
1447
content, digest = self._parse_record(record[0], data)
1448
yield (record[0], content, digest)
1450
needed_records.add(record)
1451
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1453
needed_records = sorted(set(records), key=operator.itemgetter(1))
1455
if not needed_records:
1458
# The transport optimizes the fetching as well
1459
# (ie, reads continuous ranges.)
1460
readv_response = self._transport.readv(self._filename,
1405
1461
[(pos, size) for version_id, pos, size in needed_records])
1408
for (record_id, pos, size), (pos, data) in \
1409
izip(iter(needed_records), response):
1410
content, digest = self._parse_record(record_id, data)
1411
record_map[record_id] = (digest, content)
1413
for version_id, pos, size in records:
1414
digest, content = record_map[version_id]
1463
for (version_id, pos, size), (pos, data) in \
1464
izip(iter(needed_records), readv_response):
1465
content, digest = self._parse_record(version_id, data)
1467
self._cache[version_id] = data
1415
1468
yield version_id, content, digest
1417
1470
def read_records(self, records):
1418
1471
"""Read records into a dictionary."""
1419
1472
components = {}
1420
for record_id, content, digest in self.read_records_iter(records):
1473
for record_id, content, digest in \
1474
self.read_records_iter(records):
1421
1475
components[record_id] = (content, digest)
1422
1476
return components