/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Jan Balster
  • Date: 2006-08-15 12:39:42 UTC
  • mfrom: (1923 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1928.
  • Revision ID: jan@merlinux.de-20060815123942-22c388c6e9a8ac91
merge bzr.dev 1923

Show diffs side-by-side

added added

removed removed

Lines of Context:
374
374
        """
375
375
        # write all the data
376
376
        pos = self._data.add_raw_record(data)
 
377
        offset = 0
377
378
        index_entries = []
378
379
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
380
            index_entries.append((version_id, options, pos+offset,
 
381
                                  size, parents))
 
382
            if self._data._do_cache:
 
383
                self._data._cache[version_id] = data[offset:offset+size]
 
384
            offset += size
381
385
        self._index.add_versions(index_entries)
382
386
 
 
387
    def enable_cache(self):
 
388
        """Start caching data for this knit"""
 
389
        self._data.enable_cache()
 
390
 
383
391
    def clear_cache(self):
384
392
        """Clear the data cache only."""
385
393
        self._data.clear_cache()
390
398
        # writes
391
399
        transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
392
400
        # copy the data file
393
 
        transport.put(name + DATA_SUFFIX, self._data._open_file())
394
 
        # rename the copied index into place
395
 
        transport.rename(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
 
401
        f = self._data._open_file()
 
402
        try:
 
403
            transport.put(name + DATA_SUFFIX, f)
 
404
        finally:
 
405
            f.close()
 
406
        # move the copied index into place
 
407
        transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
396
408
 
397
409
    def create_empty(self, name, transport, mode=None):
398
410
        return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
691
703
        # c = component_id, m = method, p = position, s = size, n = next
692
704
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
693
705
        record_map = {}
694
 
        for component_id, content, digest in\
695
 
            self._data.read_records_iter(records): 
 
706
        for component_id, content, digest in \
 
707
                self._data.read_records_iter(records):
696
708
            method, position, size, next = position_map[component_id]
697
709
            record_map[component_id] = method, content, digest, next
698
710
                          
1044
1056
            try:
1045
1057
                pb.update('read knit index', count, total)
1046
1058
                fp = self._transport.get(self._filename)
1047
 
                self.check_header(fp)
1048
 
                # readlines reads the whole file at once:
1049
 
                # bad for transports like http, good for local disk
1050
 
                # we save 60 ms doing this one change (
1051
 
                # from calling readline each time to calling
1052
 
                # readlines once.
1053
 
                # probably what we want for nice behaviour on
1054
 
                # http is a incremental readlines that yields, or
1055
 
                # a check for local vs non local indexes,
1056
 
                for l in fp.readlines():
1057
 
                    rec = l.split()
1058
 
                    if len(rec) < 5 or rec[-1] != ':':
1059
 
                        # corrupt line.
1060
 
                        # FIXME: in the future we should determine if its a
1061
 
                        # short write - and ignore it 
1062
 
                        # or a different failure, and raise. RBC 20060407
1063
 
                        continue
1064
 
                    count += 1
1065
 
                    total += 1
1066
 
                    #pb.update('read knit index', count, total)
1067
 
                    # See self._parse_parents
1068
 
                    parents = []
1069
 
                    for value in rec[4:-1]:
1070
 
                        if '.' == value[0]:
1071
 
                            # uncompressed reference
1072
 
                            parents.append(value[1:])
 
1059
                try:
 
1060
                    self.check_header(fp)
 
1061
                    # readlines reads the whole file at once:
 
1062
                    # bad for transports like http, good for local disk
 
1063
                    # we save 60 ms doing this one change (
 
1064
                    # from calling readline each time to calling
 
1065
                    # readlines once.
 
1066
                    # probably what we want for nice behaviour on
 
1067
                    # http is a incremental readlines that yields, or
 
1068
                    # a check for local vs non local indexes,
 
1069
                    for l in fp.readlines():
 
1070
                        rec = l.split()
 
1071
                        if len(rec) < 5 or rec[-1] != ':':
 
1072
                            # corrupt line.
 
1073
                            # FIXME: in the future we should determine if its a
 
1074
                            # short write - and ignore it 
 
1075
                            # or a different failure, and raise. RBC 20060407
 
1076
                            continue
 
1077
                        count += 1
 
1078
                        total += 1
 
1079
                        #pb.update('read knit index', count, total)
 
1080
                        # See self._parse_parents
 
1081
                        parents = []
 
1082
                        for value in rec[4:-1]:
 
1083
                            if '.' == value[0]:
 
1084
                                # uncompressed reference
 
1085
                                parents.append(value[1:])
 
1086
                            else:
 
1087
                                # this is 15/4000ms faster than isinstance,
 
1088
                                # (in lsprof)
 
1089
                                # this function is called thousands of times a 
 
1090
                                # second so small variations add up.
 
1091
                                assert value.__class__ is str
 
1092
                                parents.append(self._history[int(value)])
 
1093
                        # end self._parse_parents
 
1094
                        # self._cache_version(rec[0], 
 
1095
                        #                     rec[1].split(','),
 
1096
                        #                     int(rec[2]),
 
1097
                        #                     int(rec[3]),
 
1098
                        #                     parents)
 
1099
                        # --- self._cache_version
 
1100
                        # only want the _history index to reference the 1st 
 
1101
                        # index entry for version_id
 
1102
                        version_id = rec[0]
 
1103
                        if version_id not in self._cache:
 
1104
                            index = len(self._history)
 
1105
                            self._history.append(version_id)
1073
1106
                        else:
1074
 
                            # this is 15/4000ms faster than isinstance,
1075
 
                            # (in lsprof)
1076
 
                            # this function is called thousands of times a 
1077
 
                            # second so small variations add up.
1078
 
                            assert value.__class__ is str
1079
 
                            parents.append(self._history[int(value)])
1080
 
                    # end self._parse_parents
1081
 
                    # self._cache_version(rec[0], 
1082
 
                    #                     rec[1].split(','),
1083
 
                    #                     int(rec[2]),
1084
 
                    #                     int(rec[3]),
1085
 
                    #                     parents)
1086
 
                    # --- self._cache_version
1087
 
                    # only want the _history index to reference the 1st 
1088
 
                    # index entry for version_id
1089
 
                    version_id = rec[0]
1090
 
                    if version_id not in self._cache:
1091
 
                        index = len(self._history)
1092
 
                        self._history.append(version_id)
1093
 
                    else:
1094
 
                        index = self._cache[version_id][5]
1095
 
                    self._cache[version_id] = (version_id,
1096
 
                                               rec[1].split(','),
1097
 
                                               int(rec[2]),
1098
 
                                               int(rec[3]),
1099
 
                                               parents,
1100
 
                                               index)
1101
 
                    # --- self._cache_version 
 
1107
                            index = self._cache[version_id][5]
 
1108
                        self._cache[version_id] = (version_id,
 
1109
                                                   rec[1].split(','),
 
1110
                                                   int(rec[2]),
 
1111
                                                   int(rec[3]),
 
1112
                                                   parents,
 
1113
                                                   index)
 
1114
                        # --- self._cache_version 
 
1115
                finally:
 
1116
                    fp.close()
1102
1117
            except NoSuchFile, e:
1103
1118
                if mode != 'w' or not create:
1104
1119
                    raise
1276
1291
 
1277
1292
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1278
1293
        _KnitComponentFile.__init__(self, transport, filename, mode)
1279
 
        self._file = None
1280
1294
        self._checked = False
 
1295
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1296
        #       if the cached size gets larger than a certain amount
 
1297
        #       but it complicates the model a bit, so for now just use
 
1298
        #       a simple dictionary
 
1299
        self._cache = {}
 
1300
        self._do_cache = False
1281
1301
        if create:
1282
1302
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
1283
1303
 
 
1304
    def enable_cache(self):
 
1305
        """Enable caching of reads."""
 
1306
        self._do_cache = True
 
1307
 
1284
1308
    def clear_cache(self):
1285
1309
        """Clear the record cache."""
1286
 
        pass
 
1310
        self._do_cache = False
 
1311
        self._cache = {}
1287
1312
 
1288
1313
    def _open_file(self):
1289
 
        if self._file is None:
1290
 
            try:
1291
 
                self._file = self._transport.get(self._filename)
1292
 
            except NoSuchFile:
1293
 
                pass
1294
 
        return self._file
 
1314
        try:
 
1315
            return self._transport.get(self._filename)
 
1316
        except NoSuchFile:
 
1317
            pass
 
1318
        return None
1295
1319
 
1296
1320
    def _record_to_data(self, version_id, digest, lines):
1297
1321
        """Convert version_id, digest, lines into a raw data block.
1326
1350
        size, sio = self._record_to_data(version_id, digest, lines)
1327
1351
        # write to disk
1328
1352
        start_pos = self._transport.append(self._filename, sio)
 
1353
        if self._do_cache:
 
1354
            self._cache[version_id] = sio.getvalue()
1329
1355
        return start_pos, size
1330
1356
 
1331
1357
    def _parse_record_header(self, version_id, raw_data):
1364
1390
 
1365
1391
        This unpacks enough of the text record to validate the id is
1366
1392
        as expected but thats all.
1367
 
 
1368
 
        It will actively recompress currently cached records on the
1369
 
        basis that that is cheaper than I/O activity.
1370
1393
        """
1371
1394
        # setup an iterator of the external records:
1372
1395
        # uses readv so nice and fast we hope.
1373
1396
        if len(records):
1374
1397
            # grab the disk data needed.
1375
 
            raw_records = self._transport.readv(self._filename,
1376
 
                [(pos, size) for version_id, pos, size in records])
 
1398
            if self._cache:
 
1399
                # Don't check _cache if it is empty
 
1400
                needed_offsets = [(pos, size) for version_id, pos, size
 
1401
                                              in records
 
1402
                                              if version_id not in self._cache]
 
1403
            else:
 
1404
                needed_offsets = [(pos, size) for version_id, pos, size
 
1405
                                               in records]
 
1406
 
 
1407
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1408
                
1377
1409
 
1378
1410
        for version_id, pos, size in records:
1379
 
            pos, data = raw_records.next()
1380
 
            # validate the header
1381
 
            df, rec = self._parse_record_header(version_id, data)
1382
 
            df.close()
 
1411
            if version_id in self._cache:
 
1412
                # This data has already been validated
 
1413
                data = self._cache[version_id]
 
1414
            else:
 
1415
                pos, data = raw_records.next()
 
1416
                if self._do_cache:
 
1417
                    self._cache[version_id] = data
 
1418
 
 
1419
                # validate the header
 
1420
                df, rec = self._parse_record_header(version_id, data)
 
1421
                df.close()
1383
1422
            yield version_id, data
1384
1423
 
1385
1424
    def read_records_iter(self, records):
1386
1425
        """Read text records from data file and yield result.
1387
1426
 
1388
 
        Each passed record is a tuple of (version_id, pos, len) and
1389
 
        will be read in the given order.  Yields (version_id,
1390
 
        contents, digest).
 
1427
        The result will be returned in whatever is the fastest to read.
 
1428
        Not by the order requested. Also, multiple requests for the same
 
1429
        record will only yield 1 response.
 
1430
        :param records: A list of (version_id, pos, len) entries
 
1431
        :return: Yields (version_id, contents, digest) in the order
 
1432
                 read, not the order requested
1391
1433
        """
1392
 
        if len(records) == 0:
1393
 
            return
1394
 
        # profiling notes:
1395
 
        # 60890  calls for 4168 extractions in 5045, 683 internal.
1396
 
        # 4168   calls to readv              in 1411
1397
 
        # 4168   calls to parse_record       in 2880
1398
 
 
1399
 
        # Get unique records, sorted by position
1400
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
1401
 
 
1402
 
        # We take it that the transport optimizes the fetching as good
1403
 
        # as possible (ie, reads continuous ranges.)
1404
 
        response = self._transport.readv(self._filename,
 
1434
        if not records:
 
1435
            return
 
1436
 
 
1437
        if self._cache:
 
1438
            # Skip records we have alread seen
 
1439
            yielded_records = set()
 
1440
            needed_records = set()
 
1441
            for record in records:
 
1442
                if record[0] in self._cache:
 
1443
                    if record[0] in yielded_records:
 
1444
                        continue
 
1445
                    yielded_records.add(record[0])
 
1446
                    data = self._cache[record[0]]
 
1447
                    content, digest = self._parse_record(record[0], data)
 
1448
                    yield (record[0], content, digest)
 
1449
                else:
 
1450
                    needed_records.add(record)
 
1451
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1452
        else:
 
1453
            needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1454
 
 
1455
        if not needed_records:
 
1456
            return
 
1457
 
 
1458
        # The transport optimizes the fetching as well 
 
1459
        # (ie, reads continuous ranges.)
 
1460
        readv_response = self._transport.readv(self._filename,
1405
1461
            [(pos, size) for version_id, pos, size in needed_records])
1406
1462
 
1407
 
        record_map = {}
1408
 
        for (record_id, pos, size), (pos, data) in \
1409
 
            izip(iter(needed_records), response):
1410
 
            content, digest = self._parse_record(record_id, data)
1411
 
            record_map[record_id] = (digest, content)
1412
 
 
1413
 
        for version_id, pos, size in records:
1414
 
            digest, content = record_map[version_id]
 
1463
        for (version_id, pos, size), (pos, data) in \
 
1464
                izip(iter(needed_records), readv_response):
 
1465
            content, digest = self._parse_record(version_id, data)
 
1466
            if self._do_cache:
 
1467
                self._cache[version_id] = data
1415
1468
            yield version_id, content, digest
1416
1469
 
1417
1470
    def read_records(self, records):
1418
1471
        """Read records into a dictionary."""
1419
1472
        components = {}
1420
 
        for record_id, content, digest in self.read_records_iter(records):
 
1473
        for record_id, content, digest in \
 
1474
                self.read_records_iter(records):
1421
1475
            components[record_id] = (content, digest)
1422
1476
        return components
1423
1477