/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: John Arbash Meinel
  • Date: 2009-07-08 14:37:25 UTC
  • mfrom: (4516 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4517.
  • Revision ID: john@arbash-meinel.com-20090708143725-sc9sjy3mz4cxwxzz
Merge bzr.dev 4516

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    )
37
37
from bzrlib.index import (
38
38
    CombinedGraphIndex,
39
 
    GraphIndex,
40
 
    GraphIndexBuilder,
41
39
    GraphIndexPrefixAdapter,
42
 
    InMemoryGraphIndex,
43
40
    )
44
41
from bzrlib.knit import (
45
42
    KnitPlainFactory,
55
52
    lockable_files,
56
53
    lockdir,
57
54
    revision as _mod_revision,
58
 
    symbol_versioning,
59
55
    )
60
56
 
61
57
from bzrlib.decorators import needs_write_lock
73
69
    MetaDirRepositoryFormat,
74
70
    RepositoryFormat,
75
71
    RootCommitBuilder,
 
72
    StreamSource,
76
73
    )
77
 
import bzrlib.revision as _mod_revision
78
74
from bzrlib.trace import (
79
75
    mutter,
80
76
    warning,
268
264
 
269
265
    def __init__(self, name, revision_index, inventory_index, text_index,
270
266
        signature_index, upload_transport, pack_transport, index_transport,
271
 
        pack_collection):
 
267
        pack_collection, chk_index=None):
272
268
        """Create a ResumedPack object."""
273
269
        ExistingPack.__init__(self, pack_transport, name, revision_index,
274
 
            inventory_index, text_index, signature_index)
 
270
            inventory_index, text_index, signature_index,
 
271
            chk_index=chk_index)
275
272
        self.upload_transport = upload_transport
276
273
        self.index_transport = index_transport
277
274
        self.index_sizes = [None, None, None, None]
281
278
            ('text', text_index),
282
279
            ('signature', signature_index),
283
280
            ]
 
281
        if chk_index is not None:
 
282
            indices.append(('chk', chk_index))
 
283
            self.index_sizes.append(None)
284
284
        for index_type, index in indices:
285
285
            offset = self.index_offset(index_type)
286
286
            self.index_sizes[offset] = index._size
301
301
        self.upload_transport.delete(self.file_name())
302
302
        indices = [self.revision_index, self.inventory_index, self.text_index,
303
303
            self.signature_index]
 
304
        if self.chk_index is not None:
 
305
            indices.append(self.chk_index)
304
306
        for index in indices:
305
307
            index._transport.delete(index._name)
306
308
 
307
309
    def finish(self):
308
310
        self._check_references()
309
 
        new_name = '../packs/' + self.file_name()
310
 
        self.upload_transport.rename(self.file_name(), new_name)
311
 
        for index_type in ['revision', 'inventory', 'text', 'signature']:
 
311
        index_types = ['revision', 'inventory', 'text', 'signature']
 
312
        if self.chk_index is not None:
 
313
            index_types.append('chk')
 
314
        for index_type in index_types:
312
315
            old_name = self.index_name(index_type, self.name)
313
316
            new_name = '../indices/' + old_name
314
317
            self.upload_transport.rename(old_name, new_name)
315
318
            self._replace_index_with_readonly(index_type)
 
319
        new_name = '../packs/' + self.file_name()
 
320
        self.upload_transport.rename(self.file_name(), new_name)
316
321
        self._state = 'finished'
317
322
 
318
323
    def _get_external_refs(self, index):
 
324
        """Return compression parents for this index that are not present.
 
325
 
 
326
        This returns any compression parents that are referenced by this index,
 
327
        which are not contained *in* this index. They may be present elsewhere.
 
328
        """
319
329
        return index.external_references(1)
320
330
 
321
331
 
1292
1302
        # space (we only topo sort the revisions, which is smaller).
1293
1303
        topo_order = tsort.topo_sort(ancestors)
1294
1304
        rev_order = dict(zip(topo_order, range(len(topo_order))))
1295
 
        bad_texts.sort(key=lambda key:rev_order[key[0][1]])
 
1305
        bad_texts.sort(key=lambda key:rev_order.get(key[0][1], 0))
1296
1306
        transaction = repo.get_transaction()
1297
1307
        file_id_index = GraphIndexPrefixAdapter(
1298
1308
            self.new_pack.text_index,
1352
1362
    """
1353
1363
 
1354
1364
    pack_factory = NewPack
 
1365
    resumed_pack_factory = ResumedPack
1355
1366
 
1356
1367
    def __init__(self, repo, transport, index_transport, upload_transport,
1357
1368
                 pack_transport, index_builder_class, index_class,
1443
1454
        in synchronisation with certain steps. Otherwise the names collection
1444
1455
        is not flushed.
1445
1456
 
1446
 
        :return: True if packing took place.
 
1457
        :return: Something evaluating true if packing took place.
1447
1458
        """
1448
1459
        while True:
1449
1460
            try:
1450
1461
                return self._do_autopack()
1451
 
            except errors.RetryAutopack, e:
 
1462
            except errors.RetryAutopack:
1452
1463
                # If we get a RetryAutopack exception, we should abort the
1453
1464
                # current action, and retry.
1454
1465
                pass
1458
1469
        total_revisions = self.revision_index.combined_index.key_count()
1459
1470
        total_packs = len(self._names)
1460
1471
        if self._max_pack_count(total_revisions) >= total_packs:
1461
 
            return False
 
1472
            return None
1462
1473
        # determine which packs need changing
1463
1474
        pack_distribution = self.pack_distribution(total_revisions)
1464
1475
        existing_packs = []
1486
1497
            'containing %d revisions. Packing %d files into %d affecting %d'
1487
1498
            ' revisions', self, total_packs, total_revisions, num_old_packs,
1488
1499
            num_new_packs, num_revs_affected)
1489
 
        self._execute_pack_operations(pack_operations,
 
1500
        result = self._execute_pack_operations(pack_operations,
1490
1501
                                      reload_func=self._restart_autopack)
1491
1502
        mutter('Auto-packing repository %s completed', self)
1492
 
        return True
 
1503
        return result
1493
1504
 
1494
1505
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1495
1506
                                 reload_func=None):
1497
1508
 
1498
1509
        :param pack_operations: A list of [revision_count, packs_to_combine].
1499
1510
        :param _packer_class: The class of packer to use (default: Packer).
1500
 
        :return: None.
 
1511
        :return: The new pack names.
1501
1512
        """
1502
1513
        for revision_count, packs in pack_operations:
1503
1514
            # we may have no-ops from the setup logic
1519
1530
                self._remove_pack_from_memory(pack)
1520
1531
        # record the newly available packs and stop advertising the old
1521
1532
        # packs
1522
 
        self._save_pack_names(clear_obsolete_packs=True)
 
1533
        result = self._save_pack_names(clear_obsolete_packs=True)
1523
1534
        # Move the old packs out of the way now they are no longer referenced.
1524
1535
        for revision_count, packs in pack_operations:
1525
1536
            self._obsolete_packs(packs)
 
1537
        return result
1526
1538
 
1527
1539
    def _flush_new_pack(self):
1528
1540
        if self._new_pack is not None:
1538
1550
 
1539
1551
    def _already_packed(self):
1540
1552
        """Is the collection already packed?"""
1541
 
        return len(self._names) < 2
 
1553
        return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1542
1554
 
1543
 
    def pack(self):
 
1555
    def pack(self, hint=None):
1544
1556
        """Pack the pack collection totally."""
1545
1557
        self.ensure_loaded()
1546
1558
        total_packs = len(self._names)
1547
1559
        if self._already_packed():
1548
 
            # This is arguably wrong because we might not be optimal, but for
1549
 
            # now lets leave it in. (e.g. reconcile -> one pack. But not
1550
 
            # optimal.
1551
1560
            return
1552
1561
        total_revisions = self.revision_index.combined_index.key_count()
1553
1562
        # XXX: the following may want to be a class, to pack with a given
1554
1563
        # policy.
1555
1564
        mutter('Packing repository %s, which has %d pack files, '
1556
 
            'containing %d revisions into 1 packs.', self, total_packs,
1557
 
            total_revisions)
 
1565
            'containing %d revisions with hint %r.', self, total_packs,
 
1566
            total_revisions, hint)
1558
1567
        # determine which packs need changing
1559
 
        pack_distribution = [1]
1560
1568
        pack_operations = [[0, []]]
1561
1569
        for pack in self.all_packs():
1562
 
            pack_operations[-1][0] += pack.get_revision_count()
1563
 
            pack_operations[-1][1].append(pack)
 
1570
            if not hint or pack.name in hint:
 
1571
                pack_operations[-1][0] += pack.get_revision_count()
 
1572
                pack_operations[-1][1].append(pack)
1564
1573
        self._execute_pack_operations(pack_operations, OptimisingPacker)
1565
1574
 
1566
1575
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
1680
1689
            inv_index = self._make_index(name, '.iix', resume=True)
1681
1690
            txt_index = self._make_index(name, '.tix', resume=True)
1682
1691
            sig_index = self._make_index(name, '.six', resume=True)
1683
 
            result = ResumedPack(name, rev_index, inv_index, txt_index,
1684
 
                sig_index, self._upload_transport, self._pack_transport,
1685
 
                self._index_transport, self)
 
1692
            if self.chk_index is not None:
 
1693
                chk_index = self._make_index(name, '.cix', resume=True)
 
1694
            else:
 
1695
                chk_index = None
 
1696
            result = self.resumed_pack_factory(name, rev_index, inv_index,
 
1697
                txt_index, sig_index, self._upload_transport,
 
1698
                self._pack_transport, self._index_transport, self,
 
1699
                chk_index=chk_index)
1686
1700
        except errors.NoSuchFile, e:
1687
1701
            raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1688
1702
        self.add_pack_to_memory(result)
1809
1823
    def reset(self):
1810
1824
        """Clear all cached data."""
1811
1825
        # cached revision data
1812
 
        self.repo._revision_knit = None
1813
1826
        self.revision_index.clear()
1814
1827
        # cached signature data
1815
 
        self.repo._signature_knit = None
1816
1828
        self.signature_index.clear()
1817
1829
        # cached file text data
1818
1830
        self.text_index.clear()
1819
 
        self.repo._text_knit = None
1820
1831
        # cached inventory data
1821
1832
        self.inventory_index.clear()
1822
1833
        # cached chk data
1920
1931
 
1921
1932
        :param clear_obsolete_packs: If True, clear out the contents of the
1922
1933
            obsolete_packs directory.
 
1934
        :return: A list of the names saved that were not previously on disk.
1923
1935
        """
1924
1936
        self.lock_names()
1925
1937
        try:
1940
1952
            self._unlock_names()
1941
1953
        # synchronise the memory packs list with what we just wrote:
1942
1954
        self._syncronize_pack_names_from_disk_nodes(disk_nodes)
 
1955
        return [new_node[0][0] for new_node in new_nodes]
1943
1956
 
1944
1957
    def reload_pack_names(self):
1945
1958
        """Sync our pack listing with what is present in the repository.
2035
2048
                except KeyError:
2036
2049
                    pass
2037
2050
        del self._resumed_packs[:]
2038
 
        self.repo._text_knit = None
2039
2051
 
2040
2052
    def _remove_resumed_pack_indices(self):
2041
2053
        for resumed_pack in self._resumed_packs:
2080
2092
            if not self.autopack():
2081
2093
                # when autopack takes no steps, the names list is still
2082
2094
                # unsaved.
2083
 
                self._save_pack_names()
2084
 
        self.repo._text_knit = None
 
2095
                return self._save_pack_names()
2085
2096
 
2086
2097
    def _suspend_write_group(self):
2087
2098
        tokens = [pack.name for pack in self._resumed_packs]
2095
2106
            self._new_pack.abort()
2096
2107
            self._new_pack = None
2097
2108
        self._remove_resumed_pack_indices()
2098
 
        self.repo._text_knit = None
2099
2109
        return tokens
2100
2110
 
2101
2111
    def _resume_write_group(self, tokens):
2202
2212
                    % (self._format, self.bzrdir.transport.base))
2203
2213
 
2204
2214
    def _abort_write_group(self):
 
2215
        self.revisions._index._key_dependencies.refs.clear()
2205
2216
        self._pack_collection._abort_write_group()
2206
2217
 
2207
2218
    def _find_inconsistent_revision_parents(self):
2250
2261
            pb.finished()
2251
2262
        return result
2252
2263
 
 
2264
    def _get_source(self, to_format):
 
2265
        if to_format.network_name() == self._format.network_name():
 
2266
            return KnitPackStreamSource(self, to_format)
 
2267
        return super(KnitPackRepository, self)._get_source(to_format)
 
2268
 
2253
2269
    def _make_parents_provider(self):
2254
2270
        return graph.CachingParentsProvider(self)
2255
2271
 
2262
2278
        self._pack_collection._start_write_group()
2263
2279
 
2264
2280
    def _commit_write_group(self):
 
2281
        self.revisions._index._key_dependencies.refs.clear()
2265
2282
        return self._pack_collection._commit_write_group()
2266
2283
 
2267
2284
    def suspend_write_group(self):
2268
2285
        # XXX check self._write_group is self.get_transaction()?
2269
2286
        tokens = self._pack_collection._suspend_write_group()
 
2287
        self.revisions._index._key_dependencies.refs.clear()
2270
2288
        self._write_group = None
2271
2289
        return tokens
2272
2290
 
2273
2291
    def _resume_write_group(self, tokens):
2274
2292
        self._start_write_group()
2275
 
        self._pack_collection._resume_write_group(tokens)
 
2293
        try:
 
2294
            self._pack_collection._resume_write_group(tokens)
 
2295
        except errors.UnresumableWriteGroup:
 
2296
            self._abort_write_group()
 
2297
            raise
2276
2298
        for pack in self._pack_collection._resumed_packs:
2277
2299
            self.revisions._index.scan_unvalidated_index(pack.revision_index)
2278
2300
 
2295
2317
        self._write_lock_count += 1
2296
2318
        if self._write_lock_count == 1:
2297
2319
            self._transaction = transactions.WriteTransaction()
 
2320
        if not locked:
2298
2321
            for repo in self._fallback_repositories:
2299
2322
                # Writes don't affect fallback repos
2300
2323
                repo.lock_read()
2301
 
        if not locked:
2302
2324
            self._refresh_data()
2303
2325
 
2304
2326
    def lock_read(self):
2307
2329
            self._write_lock_count += 1
2308
2330
        else:
2309
2331
            self.control_files.lock_read()
 
2332
        if not locked:
2310
2333
            for repo in self._fallback_repositories:
2311
 
                # Writes don't affect fallback repos
2312
2334
                repo.lock_read()
2313
 
        if not locked:
2314
2335
            self._refresh_data()
2315
2336
 
2316
2337
    def leave_lock_in_place(self):
2322
2343
        raise NotImplementedError(self.dont_leave_lock_in_place)
2323
2344
 
2324
2345
    @needs_write_lock
2325
 
    def pack(self):
 
2346
    def pack(self, hint=None):
2326
2347
        """Compress the data within the repository.
2327
2348
 
2328
2349
        This will pack all the data to a single pack. In future it may
2329
2350
        recompress deltas or do other such expensive operations.
2330
2351
        """
2331
 
        self._pack_collection.pack()
 
2352
        self._pack_collection.pack(hint=hint)
2332
2353
 
2333
2354
    @needs_write_lock
2334
2355
    def reconcile(self, other=None, thorough=False):
2356
2377
                transaction = self._transaction
2357
2378
                self._transaction = None
2358
2379
                transaction.finish()
2359
 
                for repo in self._fallback_repositories:
2360
 
                    repo.unlock()
2361
2380
        else:
2362
2381
            self.control_files.unlock()
 
2382
 
 
2383
        if not self.is_locked():
2363
2384
            for repo in self._fallback_repositories:
2364
2385
                repo.unlock()
2365
2386
 
2366
2387
 
 
2388
class KnitPackStreamSource(StreamSource):
 
2389
    """A StreamSource used to transfer data between same-format KnitPack repos.
 
2390
 
 
2391
    This source assumes:
 
2392
        1) Same serialization format for all objects
 
2393
        2) Same root information
 
2394
        3) XML format inventories
 
2395
        4) Atomic inserts (so we can stream inventory texts before text
 
2396
           content)
 
2397
        5) No chk_bytes
 
2398
    """
 
2399
 
 
2400
    def __init__(self, from_repository, to_format):
 
2401
        super(KnitPackStreamSource, self).__init__(from_repository, to_format)
 
2402
        self._text_keys = None
 
2403
        self._text_fetch_order = 'unordered'
 
2404
 
 
2405
    def _get_filtered_inv_stream(self, revision_ids):
 
2406
        from_repo = self.from_repository
 
2407
        parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
 
2408
        parent_keys = [(p,) for p in parent_ids]
 
2409
        find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
 
2410
        parent_text_keys = set(find_text_keys(
 
2411
            from_repo._inventory_xml_lines_for_keys(parent_keys)))
 
2412
        content_text_keys = set()
 
2413
        knit = KnitVersionedFiles(None, None)
 
2414
        factory = KnitPlainFactory()
 
2415
        def find_text_keys_from_content(record):
 
2416
            if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
 
2417
                raise ValueError("Unknown content storage kind for"
 
2418
                    " inventory text: %s" % (record.storage_kind,))
 
2419
            # It's a knit record, it has a _raw_record field (even if it was
 
2420
            # reconstituted from a network stream).
 
2421
            raw_data = record._raw_record
 
2422
            # read the entire thing
 
2423
            revision_id = record.key[-1]
 
2424
            content, _ = knit._parse_record(revision_id, raw_data)
 
2425
            if record.storage_kind == 'knit-delta-gz':
 
2426
                line_iterator = factory.get_linedelta_content(content)
 
2427
            elif record.storage_kind == 'knit-ft-gz':
 
2428
                line_iterator = factory.get_fulltext_content(content)
 
2429
            content_text_keys.update(find_text_keys(
 
2430
                [(line, revision_id) for line in line_iterator]))
 
2431
        revision_keys = [(r,) for r in revision_ids]
 
2432
        def _filtered_inv_stream():
 
2433
            source_vf = from_repo.inventories
 
2434
            stream = source_vf.get_record_stream(revision_keys,
 
2435
                                                 'unordered', False)
 
2436
            for record in stream:
 
2437
                if record.storage_kind == 'absent':
 
2438
                    raise errors.NoSuchRevision(from_repo, record.key)
 
2439
                find_text_keys_from_content(record)
 
2440
                yield record
 
2441
            self._text_keys = content_text_keys - parent_text_keys
 
2442
        return ('inventories', _filtered_inv_stream())
 
2443
 
 
2444
    def _get_text_stream(self):
 
2445
        # Note: We know we don't have to handle adding root keys, because both
 
2446
        # the source and target are the identical network name.
 
2447
        text_stream = self.from_repository.texts.get_record_stream(
 
2448
                        self._text_keys, self._text_fetch_order, False)
 
2449
        return ('texts', text_stream)
 
2450
 
 
2451
    def get_stream(self, search):
 
2452
        revision_ids = search.get_keys()
 
2453
        for stream_info in self._fetch_revision_texts(revision_ids):
 
2454
            yield stream_info
 
2455
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
 
2456
        yield self._get_filtered_inv_stream(revision_ids)
 
2457
        yield self._get_text_stream()
 
2458
 
 
2459
 
 
2460
 
2367
2461
class RepositoryFormatPack(MetaDirRepositoryFormat):
2368
2462
    """Format logic for pack structured repositories.
2369
2463