/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

  • Committer: Jonathan Lange
  • Date: 2009-12-09 09:20:42 UTC
  • mfrom: (4881 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4907.
  • Revision ID: jml@canonical.com-20091209092042-s2zgqcf8f39yzxpj
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    chk_map,
27
27
    debug,
28
28
    errors,
 
29
    fetch as _mod_fetch,
29
30
    fifo_cache,
30
31
    generate_ids,
31
32
    gpg,
32
33
    graph,
33
34
    inventory,
 
35
    inventory_delta,
34
36
    lazy_regex,
35
37
    lockable_files,
36
38
    lockdir,
48
50
from bzrlib.testament import Testament
49
51
""")
50
52
 
51
 
from bzrlib.decorators import needs_read_lock, needs_write_lock
 
53
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
52
54
from bzrlib.inter import InterObject
53
55
from bzrlib.inventory import (
54
56
    Inventory,
56
58
    ROOT_ID,
57
59
    entry_factory,
58
60
    )
 
61
from bzrlib.lock import _RelockDebugMixin
59
62
from bzrlib import registry
60
63
from bzrlib.trace import (
61
64
    log_exception_quietly, note, mutter, mutter_callsite, warning)
204
207
            # an inventory delta was accumulated without creating a new
205
208
            # inventory.
206
209
            basis_id = self.basis_delta_revision
207
 
            self.inv_sha1 = self.repository.add_inventory_by_delta(
 
210
            # We ignore the 'inventory' returned by add_inventory_by_delta
 
211
            # because self.new_inventory is used to hint to the rest of the
 
212
            # system what code path was taken
 
213
            self.inv_sha1, _ = self.repository.add_inventory_by_delta(
208
214
                basis_id, self._basis_delta, self._new_revision_id,
209
215
                self.parents)
210
216
        else:
464
470
            if content_summary[2] is None:
465
471
                raise ValueError("Files must not have executable = None")
466
472
            if not store:
467
 
                if (# if the file length changed we have to store:
468
 
                    parent_entry.text_size != content_summary[1] or
469
 
                    # if the exec bit has changed we have to store:
 
473
                # We can't trust a check of the file length because of content
 
474
                # filtering...
 
475
                if (# if the exec bit has changed we have to store:
470
476
                    parent_entry.executable != content_summary[2]):
471
477
                    store = True
472
478
                elif parent_entry.text_sha1 == content_summary[3]:
539
545
                ie.revision = parent_entry.revision
540
546
                return self._get_delta(ie, basis_inv, path), False, None
541
547
            ie.reference_revision = content_summary[3]
 
548
            if ie.reference_revision is None:
 
549
                raise AssertionError("invalid content_summary for nested tree: %r"
 
550
                    % (content_summary,))
542
551
            self._add_text_to_weave(ie.file_id, '', heads, None)
543
552
        else:
544
553
            raise NotImplementedError('unknown kind')
806
815
                seen_root = True
807
816
        self.new_inventory = None
808
817
        if len(inv_delta):
 
818
            # This should perhaps be guarded by a check that the basis we
 
819
            # commit against is the basis for the commit and if not do a delta
 
820
            # against the basis.
809
821
            self._any_changes = True
810
822
        if not seen_root:
811
823
            # housekeeping root entry changes do not affect no-change commits.
848
860
######################################################################
849
861
# Repositories
850
862
 
851
 
class Repository(object):
 
863
 
 
864
class Repository(_RelockDebugMixin):
852
865
    """Repository holding history for one or more branches.
853
866
 
854
867
    The repository holds and retrieves historical information including
923
936
        """
924
937
        if self._write_group is not self.get_transaction():
925
938
            # has an unlock or relock occured ?
 
939
            if suppress_errors:
 
940
                mutter(
 
941
                '(suppressed) mismatched lock context and write group. %r, %r',
 
942
                self._write_group, self.get_transaction())
 
943
                return
926
944
            raise errors.BzrError(
927
945
                'mismatched lock context and write group. %r, %r' %
928
946
                (self._write_group, self.get_transaction()))
1062
1080
        check_content=True):
1063
1081
        """Store lines in inv_vf and return the sha1 of the inventory."""
1064
1082
        parents = [(parent,) for parent in parents]
1065
 
        return self.inventories.add_lines((revision_id,), parents, lines,
 
1083
        result = self.inventories.add_lines((revision_id,), parents, lines,
1066
1084
            check_content=check_content)[0]
 
1085
        self.inventories._access.flush()
 
1086
        return result
1067
1087
 
1068
1088
    def add_revision(self, revision_id, rev, inv=None, config=None):
1069
1089
        """Add rev to the revision store as revision_id.
1145
1165
        # The old API returned a list, should this actually be a set?
1146
1166
        return parent_map.keys()
1147
1167
 
 
1168
    def _check_inventories(self, checker):
 
1169
        """Check the inventories found from the revision scan.
 
1170
        
 
1171
        This is responsible for verifying the sha1 of inventories and
 
1172
        creating a pending_keys set that covers data referenced by inventories.
 
1173
        """
 
1174
        bar = ui.ui_factory.nested_progress_bar()
 
1175
        try:
 
1176
            self._do_check_inventories(checker, bar)
 
1177
        finally:
 
1178
            bar.finished()
 
1179
 
 
1180
    def _do_check_inventories(self, checker, bar):
 
1181
        """Helper for _check_inventories."""
 
1182
        revno = 0
 
1183
        keys = {'chk_bytes':set(), 'inventories':set(), 'texts':set()}
 
1184
        kinds = ['chk_bytes', 'texts']
 
1185
        count = len(checker.pending_keys)
 
1186
        bar.update("inventories", 0, 2)
 
1187
        current_keys = checker.pending_keys
 
1188
        checker.pending_keys = {}
 
1189
        # Accumulate current checks.
 
1190
        for key in current_keys:
 
1191
            if key[0] != 'inventories' and key[0] not in kinds:
 
1192
                checker._report_items.append('unknown key type %r' % (key,))
 
1193
            keys[key[0]].add(key[1:])
 
1194
        if keys['inventories']:
 
1195
            # NB: output order *should* be roughly sorted - topo or
 
1196
            # inverse topo depending on repository - either way decent
 
1197
            # to just delta against. However, pre-CHK formats didn't
 
1198
            # try to optimise inventory layout on disk. As such the
 
1199
            # pre-CHK code path does not use inventory deltas.
 
1200
            last_object = None
 
1201
            for record in self.inventories.check(keys=keys['inventories']):
 
1202
                if record.storage_kind == 'absent':
 
1203
                    checker._report_items.append(
 
1204
                        'Missing inventory {%s}' % (record.key,))
 
1205
                else:
 
1206
                    last_object = self._check_record('inventories', record,
 
1207
                        checker, last_object,
 
1208
                        current_keys[('inventories',) + record.key])
 
1209
            del keys['inventories']
 
1210
        else:
 
1211
            return
 
1212
        bar.update("texts", 1)
 
1213
        while (checker.pending_keys or keys['chk_bytes']
 
1214
            or keys['texts']):
 
1215
            # Something to check.
 
1216
            current_keys = checker.pending_keys
 
1217
            checker.pending_keys = {}
 
1218
            # Accumulate current checks.
 
1219
            for key in current_keys:
 
1220
                if key[0] not in kinds:
 
1221
                    checker._report_items.append('unknown key type %r' % (key,))
 
1222
                keys[key[0]].add(key[1:])
 
1223
            # Check the outermost kind only - inventories || chk_bytes || texts
 
1224
            for kind in kinds:
 
1225
                if keys[kind]:
 
1226
                    last_object = None
 
1227
                    for record in getattr(self, kind).check(keys=keys[kind]):
 
1228
                        if record.storage_kind == 'absent':
 
1229
                            checker._report_items.append(
 
1230
                                'Missing %s {%s}' % (kind, record.key,))
 
1231
                        else:
 
1232
                            last_object = self._check_record(kind, record,
 
1233
                                checker, last_object, current_keys[(kind,) + record.key])
 
1234
                    keys[kind] = set()
 
1235
                    break
 
1236
 
 
1237
    def _check_record(self, kind, record, checker, last_object, item_data):
 
1238
        """Check a single text from this repository."""
 
1239
        if kind == 'inventories':
 
1240
            rev_id = record.key[0]
 
1241
            inv = self.deserialise_inventory(rev_id,
 
1242
                record.get_bytes_as('fulltext'))
 
1243
            if last_object is not None:
 
1244
                delta = inv._make_delta(last_object)
 
1245
                for old_path, path, file_id, ie in delta:
 
1246
                    if ie is None:
 
1247
                        continue
 
1248
                    ie.check(checker, rev_id, inv)
 
1249
            else:
 
1250
                for path, ie in inv.iter_entries():
 
1251
                    ie.check(checker, rev_id, inv)
 
1252
            if self._format.fast_deltas:
 
1253
                return inv
 
1254
        elif kind == 'chk_bytes':
 
1255
            # No code written to check chk_bytes for this repo format.
 
1256
            checker._report_items.append(
 
1257
                'unsupported key type chk_bytes for %s' % (record.key,))
 
1258
        elif kind == 'texts':
 
1259
            self._check_text(record, checker, item_data)
 
1260
        else:
 
1261
            checker._report_items.append(
 
1262
                'unknown key type %s for %s' % (kind, record.key))
 
1263
 
 
1264
    def _check_text(self, record, checker, item_data):
 
1265
        """Check a single text."""
 
1266
        # Check it is extractable.
 
1267
        # TODO: check length.
 
1268
        if record.storage_kind == 'chunked':
 
1269
            chunks = record.get_bytes_as(record.storage_kind)
 
1270
            sha1 = osutils.sha_strings(chunks)
 
1271
            length = sum(map(len, chunks))
 
1272
        else:
 
1273
            content = record.get_bytes_as('fulltext')
 
1274
            sha1 = osutils.sha_string(content)
 
1275
            length = len(content)
 
1276
        if item_data and sha1 != item_data[1]:
 
1277
            checker._report_items.append(
 
1278
                'sha1 mismatch: %s has sha1 %s expected %s referenced by %s' %
 
1279
                (record.key, sha1, item_data[1], item_data[2]))
 
1280
 
1148
1281
    @staticmethod
1149
1282
    def create(a_bzrdir):
1150
1283
        """Construct the current default format repository in a_bzrdir."""
1183
1316
        self._inventory_entry_cache = fifo_cache.FIFOCache(10*1024)
1184
1317
 
1185
1318
    def __repr__(self):
1186
 
        return '%s(%r)' % (self.__class__.__name__,
1187
 
                           self.base)
 
1319
        if self._fallback_repositories:
 
1320
            return '%s(%r, fallback_repositories=%r)' % (
 
1321
                self.__class__.__name__,
 
1322
                self.base,
 
1323
                self._fallback_repositories)
 
1324
        else:
 
1325
            return '%s(%r)' % (self.__class__.__name__,
 
1326
                               self.base)
 
1327
 
 
1328
    def _has_same_fallbacks(self, other_repo):
 
1329
        """Returns true if the repositories have the same fallbacks."""
 
1330
        my_fb = self._fallback_repositories
 
1331
        other_fb = other_repo._fallback_repositories
 
1332
        if len(my_fb) != len(other_fb):
 
1333
            return False
 
1334
        for f, g in zip(my_fb, other_fb):
 
1335
            if not f.has_same_location(g):
 
1336
                return False
 
1337
        return True
1188
1338
 
1189
1339
    def has_same_location(self, other):
1190
1340
        """Returns a boolean indicating if this repository is at the same
1236
1386
        locked = self.is_locked()
1237
1387
        result = self.control_files.lock_write(token=token)
1238
1388
        if not locked:
 
1389
            self._note_lock('w')
1239
1390
            for repo in self._fallback_repositories:
1240
1391
                # Writes don't affect fallback repos
1241
1392
                repo.lock_read()
1246
1397
        locked = self.is_locked()
1247
1398
        self.control_files.lock_read()
1248
1399
        if not locked:
 
1400
            self._note_lock('r')
1249
1401
            for repo in self._fallback_repositories:
1250
1402
                repo.lock_read()
1251
1403
            self._refresh_data()
1398
1550
        """Commit the contents accrued within the current write group.
1399
1551
 
1400
1552
        :seealso: start_write_group.
 
1553
        
 
1554
        :return: it may return an opaque hint that can be passed to 'pack'.
1401
1555
        """
1402
1556
        if self._write_group is not self.get_transaction():
1403
1557
            # has an unlock or relock occured ?
1457
1611
        # but at the moment we're only checking for texts referenced by
1458
1612
        # inventories at the graph's edge.
1459
1613
        key_deps = self.revisions._index._key_dependencies
1460
 
        key_deps.add_keys(present_inventories)
 
1614
        key_deps.satisfy_refs_for_keys(present_inventories)
1461
1615
        referrers = frozenset(r[0] for r in key_deps.get_referrers())
1462
1616
        file_ids = self.fileids_altered_by_revision_ids(referrers)
1463
1617
        missing_texts = set()
1529
1683
            raise errors.InternalBzrError(
1530
1684
                "May not fetch while in a write group.")
1531
1685
        # fast path same-url fetch operations
1532
 
        if self.has_same_location(source) and fetch_spec is None:
 
1686
        # TODO: lift out to somewhere common with RemoteRepository
 
1687
        # <https://bugs.edge.launchpad.net/bzr/+bug/401646>
 
1688
        if (self.has_same_location(source)
 
1689
            and fetch_spec is None
 
1690
            and self._has_same_fallbacks(source)):
1533
1691
            # check that last_revision is in 'from' and then return a
1534
1692
            # no-operation.
1535
1693
            if (revision_id is not None and
1560
1718
        :param revprops: Optional dictionary of revision properties.
1561
1719
        :param revision_id: Optional revision id.
1562
1720
        """
 
1721
        if self._fallback_repositories:
 
1722
            raise errors.BzrError("Cannot commit from a lightweight checkout "
 
1723
                "to a stacked branch. See "
 
1724
                "https://bugs.launchpad.net/bzr/+bug/375013 for details.")
1563
1725
        result = self._commit_builder_class(self, parents, config,
1564
1726
            timestamp, timezone, committer, revprops, revision_id)
1565
1727
        self.start_write_group()
1566
1728
        return result
1567
1729
 
 
1730
    @only_raises(errors.LockNotHeld, errors.LockBroken)
1568
1731
    def unlock(self):
1569
1732
        if (self.control_files._lock_count == 1 and
1570
1733
            self.control_files._lock_mode == 'w'):
1692
1855
 
1693
1856
    @needs_read_lock
1694
1857
    def get_revisions(self, revision_ids):
1695
 
        """Get many revisions at once."""
 
1858
        """Get many revisions at once.
 
1859
        
 
1860
        Repositories that need to check data on every revision read should 
 
1861
        subclass this method.
 
1862
        """
1696
1863
        return self._get_revisions(revision_ids)
1697
1864
 
1698
1865
    @needs_read_lock
1699
1866
    def _get_revisions(self, revision_ids):
1700
1867
        """Core work logic to get many revisions without sanity checks."""
1701
 
        for rev_id in revision_ids:
1702
 
            if not rev_id or not isinstance(rev_id, basestring):
1703
 
                raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
 
1868
        revs = {}
 
1869
        for revid, rev in self._iter_revisions(revision_ids):
 
1870
            if rev is None:
 
1871
                raise errors.NoSuchRevision(self, revid)
 
1872
            revs[revid] = rev
 
1873
        return [revs[revid] for revid in revision_ids]
 
1874
 
 
1875
    def _iter_revisions(self, revision_ids):
 
1876
        """Iterate over revision objects.
 
1877
 
 
1878
        :param revision_ids: An iterable of revisions to examine. None may be
 
1879
            passed to request all revisions known to the repository. Note that
 
1880
            not all repositories can find unreferenced revisions; for those
 
1881
            repositories only referenced ones will be returned.
 
1882
        :return: An iterator of (revid, revision) tuples. Absent revisions (
 
1883
            those asked for but not available) are returned as (revid, None).
 
1884
        """
 
1885
        if revision_ids is None:
 
1886
            revision_ids = self.all_revision_ids()
 
1887
        else:
 
1888
            for rev_id in revision_ids:
 
1889
                if not rev_id or not isinstance(rev_id, basestring):
 
1890
                    raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
1704
1891
        keys = [(key,) for key in revision_ids]
1705
1892
        stream = self.revisions.get_record_stream(keys, 'unordered', True)
1706
 
        revs = {}
1707
1893
        for record in stream:
 
1894
            revid = record.key[0]
1708
1895
            if record.storage_kind == 'absent':
1709
 
                raise errors.NoSuchRevision(self, record.key[0])
1710
 
            text = record.get_bytes_as('fulltext')
1711
 
            rev = self._serializer.read_revision_from_string(text)
1712
 
            revs[record.key[0]] = rev
1713
 
        return [revs[revid] for revid in revision_ids]
 
1896
                yield (revid, None)
 
1897
            else:
 
1898
                text = record.get_bytes_as('fulltext')
 
1899
                rev = self._serializer.read_revision_from_string(text)
 
1900
                yield (revid, rev)
1714
1901
 
1715
1902
    @needs_read_lock
1716
1903
    def get_revision_xml(self, revision_id):
2071
2258
                batch_size]
2072
2259
            if not to_query:
2073
2260
                break
2074
 
            for rev_tree in self.revision_trees(to_query):
2075
 
                revision_id = rev_tree.get_revision_id()
 
2261
            for revision_id in to_query:
2076
2262
                parent_ids = ancestors[revision_id]
2077
2263
                for text_key in revision_keys[revision_id]:
2078
2264
                    pb.update("Calculating text parents", processed_texts)
2151
2337
        num_file_ids = len(file_ids)
2152
2338
        for file_id, altered_versions in file_ids.iteritems():
2153
2339
            if pb is not None:
2154
 
                pb.update("fetch texts", count, num_file_ids)
 
2340
                pb.update("Fetch texts", count, num_file_ids)
2155
2341
            count += 1
2156
2342
            yield ("file", file_id, altered_versions)
2157
2343
 
2178
2364
        """Get Inventory object by revision id."""
2179
2365
        return self.iter_inventories([revision_id]).next()
2180
2366
 
2181
 
    def iter_inventories(self, revision_ids):
 
2367
    def iter_inventories(self, revision_ids, ordering=None):
2182
2368
        """Get many inventories by revision_ids.
2183
2369
 
2184
2370
        This will buffer some or all of the texts used in constructing the
2186
2372
        time.
2187
2373
 
2188
2374
        :param revision_ids: The expected revision ids of the inventories.
 
2375
        :param ordering: optional ordering, e.g. 'topological'.  If not
 
2376
            specified, the order of revision_ids will be preserved (by
 
2377
            buffering if necessary).
2189
2378
        :return: An iterator of inventories.
2190
2379
        """
2191
2380
        if ((None in revision_ids)
2192
2381
            or (_mod_revision.NULL_REVISION in revision_ids)):
2193
2382
            raise ValueError('cannot get null revision inventory')
2194
 
        return self._iter_inventories(revision_ids)
 
2383
        return self._iter_inventories(revision_ids, ordering)
2195
2384
 
2196
 
    def _iter_inventories(self, revision_ids):
 
2385
    def _iter_inventories(self, revision_ids, ordering):
2197
2386
        """single-document based inventory iteration."""
2198
 
        for text, revision_id in self._iter_inventory_xmls(revision_ids):
 
2387
        inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
 
2388
        for text, revision_id in inv_xmls:
2199
2389
            yield self.deserialise_inventory(revision_id, text)
2200
2390
 
2201
 
    def _iter_inventory_xmls(self, revision_ids):
 
2391
    def _iter_inventory_xmls(self, revision_ids, ordering):
 
2392
        if ordering is None:
 
2393
            order_as_requested = True
 
2394
            ordering = 'unordered'
 
2395
        else:
 
2396
            order_as_requested = False
2202
2397
        keys = [(revision_id,) for revision_id in revision_ids]
2203
 
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
 
2398
        if not keys:
 
2399
            return
 
2400
        if order_as_requested:
 
2401
            key_iter = iter(keys)
 
2402
            next_key = key_iter.next()
 
2403
        stream = self.inventories.get_record_stream(keys, ordering, True)
2204
2404
        text_chunks = {}
2205
2405
        for record in stream:
2206
2406
            if record.storage_kind != 'absent':
2207
 
                text_chunks[record.key] = record.get_bytes_as('chunked')
 
2407
                chunks = record.get_bytes_as('chunked')
 
2408
                if order_as_requested:
 
2409
                    text_chunks[record.key] = chunks
 
2410
                else:
 
2411
                    yield ''.join(chunks), record.key[-1]
2208
2412
            else:
2209
2413
                raise errors.NoSuchRevision(self, record.key)
2210
 
        for key in keys:
2211
 
            chunks = text_chunks.pop(key)
2212
 
            yield ''.join(chunks), key[-1]
 
2414
            if order_as_requested:
 
2415
                # Yield as many results as we can while preserving order.
 
2416
                while next_key in text_chunks:
 
2417
                    chunks = text_chunks.pop(next_key)
 
2418
                    yield ''.join(chunks), next_key[-1]
 
2419
                    try:
 
2420
                        next_key = key_iter.next()
 
2421
                    except StopIteration:
 
2422
                        # We still want to fully consume the get_record_stream,
 
2423
                        # just in case it is not actually finished at this point
 
2424
                        next_key = None
 
2425
                        break
2213
2426
 
2214
2427
    def deserialise_inventory(self, revision_id, xml):
2215
2428
        """Transform the xml into an inventory object.
2236
2449
    @needs_read_lock
2237
2450
    def get_inventory_xml(self, revision_id):
2238
2451
        """Get inventory XML as a file object."""
2239
 
        texts = self._iter_inventory_xmls([revision_id])
 
2452
        texts = self._iter_inventory_xmls([revision_id], 'unordered')
2240
2453
        try:
2241
2454
            text, revision_id = texts.next()
2242
2455
        except StopIteration:
2456
2669
        for ((revision_id,), parent_keys) in \
2457
2670
                self.revisions.get_parent_map(query_keys).iteritems():
2458
2671
            if parent_keys:
2459
 
                result[revision_id] = tuple(parent_revid
2460
 
                    for (parent_revid,) in parent_keys)
 
2672
                result[revision_id] = tuple([parent_revid
 
2673
                    for (parent_revid,) in parent_keys])
2461
2674
            else:
2462
2675
                result[revision_id] = (_mod_revision.NULL_REVISION,)
2463
2676
        return result
2474
2687
                [parents_provider, other_repository._make_parents_provider()])
2475
2688
        return graph.Graph(parents_provider)
2476
2689
 
2477
 
    def _get_versioned_file_checker(self, text_key_references=None):
 
2690
    def _get_versioned_file_checker(self, text_key_references=None,
 
2691
        ancestors=None):
2478
2692
        """Return an object suitable for checking versioned files.
2479
2693
        
2480
2694
        :param text_key_references: if non-None, an already built
2482
2696
            to whether they were referred to by the inventory of the
2483
2697
            revision_id that they contain. If None, this will be
2484
2698
            calculated.
 
2699
        :param ancestors: Optional result from
 
2700
            self.get_graph().get_parent_map(self.all_revision_ids()) if already
 
2701
            available.
2485
2702
        """
2486
2703
        return _VersionedFileChecker(self,
2487
 
            text_key_references=text_key_references)
 
2704
            text_key_references=text_key_references, ancestors=ancestors)
2488
2705
 
2489
2706
    def revision_ids_to_search_result(self, result_set):
2490
2707
        """Convert a set of revision ids to a graph SearchResult."""
2540
2757
        return record.get_bytes_as('fulltext')
2541
2758
 
2542
2759
    @needs_read_lock
2543
 
    def check(self, revision_ids=None):
 
2760
    def check(self, revision_ids=None, callback_refs=None, check_repo=True):
2544
2761
        """Check consistency of all history of given revision_ids.
2545
2762
 
2546
2763
        Different repository implementations should override _check().
2547
2764
 
2548
2765
        :param revision_ids: A non-empty list of revision_ids whose ancestry
2549
2766
             will be checked.  Typically the last revision_id of a branch.
 
2767
        :param callback_refs: A dict of check-refs to resolve and callback
 
2768
            the check/_check method on the items listed as wanting the ref.
 
2769
            see bzrlib.check.
 
2770
        :param check_repo: If False do not check the repository contents, just 
 
2771
            calculate the data callback_refs requires and call them back.
2550
2772
        """
2551
 
        return self._check(revision_ids)
 
2773
        return self._check(revision_ids, callback_refs=callback_refs,
 
2774
            check_repo=check_repo)
2552
2775
 
2553
 
    def _check(self, revision_ids):
2554
 
        result = check.Check(self)
2555
 
        result.check()
 
2776
    def _check(self, revision_ids, callback_refs, check_repo):
 
2777
        result = check.Check(self, check_repo=check_repo)
 
2778
        result.check(callback_refs)
2556
2779
        return result
2557
2780
 
2558
2781
    def _warn_if_deprecated(self):
2848
3071
    # help), and for fetching when data won't have come from the same
2849
3072
    # compressor.
2850
3073
    pack_compresses = False
 
3074
    # Does the repository inventory storage understand references to trees?
 
3075
    supports_tree_reference = None
2851
3076
 
2852
3077
    def __str__(self):
2853
3078
        return "<%s>" % self.__class__.__name__
2869
3094
        """
2870
3095
        try:
2871
3096
            transport = a_bzrdir.get_repository_transport(None)
2872
 
            format_string = transport.get("format").read()
 
3097
            format_string = transport.get_bytes("format")
2873
3098
            return format_registry.get(format_string)
2874
3099
        except errors.NoSuchFile:
2875
3100
            raise errors.NoRepositoryPresent(a_bzrdir)
2957
3182
        raise NotImplementedError(self.network_name)
2958
3183
 
2959
3184
    def check_conversion_target(self, target_format):
2960
 
        raise NotImplementedError(self.check_conversion_target)
 
3185
        if self.rich_root_data and not target_format.rich_root_data:
 
3186
            raise errors.BadConversionTarget(
 
3187
                'Does not support rich root data.', target_format,
 
3188
                from_format=self)
 
3189
        if (self.supports_tree_reference and 
 
3190
            not getattr(target_format, 'supports_tree_reference', False)):
 
3191
            raise errors.BadConversionTarget(
 
3192
                'Does not support nested trees', target_format,
 
3193
                from_format=self)
2961
3194
 
2962
3195
    def open(self, a_bzrdir, _found=False):
2963
3196
        """Return an instance of this format for the bzrdir a_bzrdir.
3180
3413
                   provided a default one will be created.
3181
3414
        :return: None.
3182
3415
        """
3183
 
        from bzrlib.fetch import RepoFetcher
3184
 
        f = RepoFetcher(to_repository=self.target,
 
3416
        f = _mod_fetch.RepoFetcher(to_repository=self.target,
3185
3417
                               from_repository=self.source,
3186
3418
                               last_revision=revision_id,
3187
3419
                               fetch_spec=fetch_spec,
3360
3592
                self.target.texts.insert_record_stream(
3361
3593
                    self.source.texts.get_record_stream(
3362
3594
                        self.source.texts.keys(), 'topological', False))
3363
 
                pb.update('copying inventory', 0, 1)
 
3595
                pb.update('Copying inventory', 0, 1)
3364
3596
                self.target.inventories.insert_record_stream(
3365
3597
                    self.source.inventories.get_record_stream(
3366
3598
                        self.source.inventories.keys(), 'topological', False))
3492
3724
        # This is redundant with format.check_conversion_target(), however that
3493
3725
        # raises an exception, and we just want to say "False" as in we won't
3494
3726
        # support converting between these formats.
 
3727
        if 'IDS_never' in debug.debug_flags:
 
3728
            return False
3495
3729
        if source.supports_rich_root() and not target.supports_rich_root():
3496
3730
            return False
3497
3731
        if (source._format.supports_tree_reference
3498
3732
            and not target._format.supports_tree_reference):
3499
3733
            return False
 
3734
        if target._fallback_repositories and target._format.supports_chks:
 
3735
            # IDS doesn't know how to copy CHKs for the parent inventories it
 
3736
            # adds to stacked repos.
 
3737
            return False
 
3738
        if 'IDS_always' in debug.debug_flags:
 
3739
            return True
 
3740
        # Only use this code path for local source and target.  IDS does far
 
3741
        # too much IO (both bandwidth and roundtrips) over a network.
 
3742
        if not source.bzrdir.transport.base.startswith('file:///'):
 
3743
            return False
 
3744
        if not target.bzrdir.transport.base.startswith('file:///'):
 
3745
            return False
3500
3746
        return True
3501
3747
 
3502
 
    def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
 
3748
    def _get_trees(self, revision_ids, cache):
 
3749
        possible_trees = []
 
3750
        for rev_id in revision_ids:
 
3751
            if rev_id in cache:
 
3752
                possible_trees.append((rev_id, cache[rev_id]))
 
3753
            else:
 
3754
                # Not cached, but inventory might be present anyway.
 
3755
                try:
 
3756
                    tree = self.source.revision_tree(rev_id)
 
3757
                except errors.NoSuchRevision:
 
3758
                    # Nope, parent is ghost.
 
3759
                    pass
 
3760
                else:
 
3761
                    cache[rev_id] = tree
 
3762
                    possible_trees.append((rev_id, tree))
 
3763
        return possible_trees
 
3764
 
 
3765
    def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3503
3766
        """Get the best delta and base for this revision.
3504
3767
 
3505
3768
        :return: (basis_id, delta)
3506
3769
        """
3507
 
        possible_trees = [(parent_id, cache[parent_id])
3508
 
                          for parent_id in parent_ids
3509
 
                           if parent_id in cache]
3510
 
        if len(possible_trees) == 0:
3511
 
            # There either aren't any parents, or the parents aren't in the
3512
 
            # cache, so just use the last converted tree
3513
 
            possible_trees.append((basis_id, cache[basis_id]))
3514
3770
        deltas = []
 
3771
        # Generate deltas against each tree, to find the shortest.
 
3772
        texts_possibly_new_in_tree = set()
3515
3773
        for basis_id, basis_tree in possible_trees:
3516
3774
            delta = tree.inventory._make_delta(basis_tree.inventory)
 
3775
            for old_path, new_path, file_id, new_entry in delta:
 
3776
                if new_path is None:
 
3777
                    # This file_id isn't present in the new rev, so we don't
 
3778
                    # care about it.
 
3779
                    continue
 
3780
                if not new_path:
 
3781
                    # Rich roots are handled elsewhere...
 
3782
                    continue
 
3783
                kind = new_entry.kind
 
3784
                if kind != 'directory' and kind != 'file':
 
3785
                    # No text record associated with this inventory entry.
 
3786
                    continue
 
3787
                # This is a directory or file that has changed somehow.
 
3788
                texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3517
3789
            deltas.append((len(delta), basis_id, delta))
3518
3790
        deltas.sort()
3519
3791
        return deltas[0][1:]
3520
3792
 
3521
 
    def _get_parent_keys(self, root_key, parent_map):
3522
 
        """Get the parent keys for a given root id."""
3523
 
        root_id, rev_id = root_key
3524
 
        # Include direct parents of the revision, but only if they used
3525
 
        # the same root_id and are heads.
3526
 
        parent_keys = []
3527
 
        for parent_id in parent_map[rev_id]:
3528
 
            if parent_id == _mod_revision.NULL_REVISION:
3529
 
                continue
3530
 
            if parent_id not in self._revision_id_to_root_id:
3531
 
                # We probably didn't read this revision, go spend the
3532
 
                # extra effort to actually check
3533
 
                try:
3534
 
                    tree = self.source.revision_tree(parent_id)
3535
 
                except errors.NoSuchRevision:
3536
 
                    # Ghost, fill out _revision_id_to_root_id in case we
3537
 
                    # encounter this again.
3538
 
                    # But set parent_root_id to None since we don't really know
3539
 
                    parent_root_id = None
3540
 
                else:
3541
 
                    parent_root_id = tree.get_root_id()
3542
 
                self._revision_id_to_root_id[parent_id] = None
3543
 
            else:
3544
 
                parent_root_id = self._revision_id_to_root_id[parent_id]
3545
 
            if root_id == parent_root_id:
3546
 
                # With stacking we _might_ want to refer to a non-local
3547
 
                # revision, but this code path only applies when we have the
3548
 
                # full content available, so ghosts really are ghosts, not just
3549
 
                # the edge of local data.
3550
 
                parent_keys.append((parent_id,))
3551
 
            else:
3552
 
                # root_id may be in the parent anyway.
3553
 
                try:
3554
 
                    tree = self.source.revision_tree(parent_id)
3555
 
                except errors.NoSuchRevision:
3556
 
                    # ghost, can't refer to it.
3557
 
                    pass
3558
 
                else:
3559
 
                    try:
3560
 
                        parent_keys.append((tree.inventory[root_id].revision,))
3561
 
                    except errors.NoSuchId:
3562
 
                        # not in the tree
3563
 
                        pass
3564
 
        g = graph.Graph(self.source.revisions)
3565
 
        heads = g.heads(parent_keys)
3566
 
        selected_keys = []
3567
 
        for key in parent_keys:
3568
 
            if key in heads and key not in selected_keys:
3569
 
                selected_keys.append(key)
3570
 
        return tuple([(root_id,)+ key for key in selected_keys])
3571
 
 
3572
 
    def _new_root_data_stream(self, root_keys_to_create, parent_map):
3573
 
        for root_key in root_keys_to_create:
3574
 
            parent_keys = self._get_parent_keys(root_key, parent_map)
3575
 
            yield versionedfile.FulltextContentFactory(root_key,
3576
 
                parent_keys, None, '')
3577
 
 
3578
 
    def _fetch_batch(self, revision_ids, basis_id, cache):
 
3793
    def _fetch_parent_invs_for_stacking(self, parent_map, cache):
 
3794
        """Find all parent revisions that are absent, but for which the
 
3795
        inventory is present, and copy those inventories.
 
3796
 
 
3797
        This is necessary to preserve correctness when the source is stacked
 
3798
        without fallbacks configured.  (Note that in cases like upgrade the
 
3799
        source may be not have _fallback_repositories even though it is
 
3800
        stacked.)
 
3801
        """
 
3802
        parent_revs = set()
 
3803
        for parents in parent_map.values():
 
3804
            parent_revs.update(parents)
 
3805
        present_parents = self.source.get_parent_map(parent_revs)
 
3806
        absent_parents = set(parent_revs).difference(present_parents)
 
3807
        parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
 
3808
            (rev_id,) for rev_id in absent_parents)
 
3809
        parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
 
3810
        for parent_tree in self.source.revision_trees(parent_inv_ids):
 
3811
            current_revision_id = parent_tree.get_revision_id()
 
3812
            parents_parents_keys = parent_invs_keys_for_stacking[
 
3813
                (current_revision_id,)]
 
3814
            parents_parents = [key[-1] for key in parents_parents_keys]
 
3815
            basis_id = _mod_revision.NULL_REVISION
 
3816
            basis_tree = self.source.revision_tree(basis_id)
 
3817
            delta = parent_tree.inventory._make_delta(basis_tree.inventory)
 
3818
            self.target.add_inventory_by_delta(
 
3819
                basis_id, delta, current_revision_id, parents_parents)
 
3820
            cache[current_revision_id] = parent_tree
 
3821
 
 
3822
    def _fetch_batch(self, revision_ids, basis_id, cache, a_graph=None):
3579
3823
        """Fetch across a few revisions.
3580
3824
 
3581
3825
        :param revision_ids: The revisions to copy
3582
3826
        :param basis_id: The revision_id of a tree that must be in cache, used
3583
3827
            as a basis for delta when no other base is available
3584
3828
        :param cache: A cache of RevisionTrees that we can use.
 
3829
        :param a_graph: A Graph object to determine the heads() of the
 
3830
            rich-root data stream.
3585
3831
        :return: The revision_id of the last converted tree. The RevisionTree
3586
3832
            for it will be in cache
3587
3833
        """
3593
3839
        pending_deltas = []
3594
3840
        pending_revisions = []
3595
3841
        parent_map = self.source.get_parent_map(revision_ids)
 
3842
        self._fetch_parent_invs_for_stacking(parent_map, cache)
3596
3843
        for tree in self.source.revision_trees(revision_ids):
 
3844
            # Find a inventory delta for this revision.
 
3845
            # Find text entries that need to be copied, too.
3597
3846
            current_revision_id = tree.get_revision_id()
3598
3847
            parent_ids = parent_map.get(current_revision_id, ())
 
3848
            parent_trees = self._get_trees(parent_ids, cache)
 
3849
            possible_trees = list(parent_trees)
 
3850
            if len(possible_trees) == 0:
 
3851
                # There either aren't any parents, or the parents are ghosts,
 
3852
                # so just use the last converted tree.
 
3853
                possible_trees.append((basis_id, cache[basis_id]))
3599
3854
            basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3600
 
                                                           basis_id, cache)
 
3855
                                                           possible_trees)
 
3856
            revision = self.source.get_revision(current_revision_id)
 
3857
            pending_deltas.append((basis_id, delta,
 
3858
                current_revision_id, revision.parent_ids))
3601
3859
            if self._converting_to_rich_root:
3602
3860
                self._revision_id_to_root_id[current_revision_id] = \
3603
3861
                    tree.get_root_id()
3604
 
            # Find text entries that need to be copied
 
3862
            # Determine which texts are in present in this revision but not in
 
3863
            # any of the available parents.
 
3864
            texts_possibly_new_in_tree = set()
3605
3865
            for old_path, new_path, file_id, entry in delta:
3606
 
                if new_path is not None:
3607
 
                    if not new_path:
3608
 
                        # This is the root
3609
 
                        if not self.target.supports_rich_root():
3610
 
                            # The target doesn't support rich root, so we don't
3611
 
                            # copy
3612
 
                            continue
3613
 
                        if self._converting_to_rich_root:
3614
 
                            # This can't be copied normally, we have to insert
3615
 
                            # it specially
3616
 
                            root_keys_to_create.add((file_id, entry.revision))
3617
 
                            continue
3618
 
                    text_keys.add((file_id, entry.revision))
3619
 
            revision = self.source.get_revision(current_revision_id)
3620
 
            pending_deltas.append((basis_id, delta,
3621
 
                current_revision_id, revision.parent_ids))
 
3866
                if new_path is None:
 
3867
                    # This file_id isn't present in the new rev
 
3868
                    continue
 
3869
                if not new_path:
 
3870
                    # This is the root
 
3871
                    if not self.target.supports_rich_root():
 
3872
                        # The target doesn't support rich root, so we don't
 
3873
                        # copy
 
3874
                        continue
 
3875
                    if self._converting_to_rich_root:
 
3876
                        # This can't be copied normally, we have to insert
 
3877
                        # it specially
 
3878
                        root_keys_to_create.add((file_id, entry.revision))
 
3879
                        continue
 
3880
                kind = entry.kind
 
3881
                texts_possibly_new_in_tree.add((file_id, entry.revision))
 
3882
            for basis_id, basis_tree in possible_trees:
 
3883
                basis_inv = basis_tree.inventory
 
3884
                for file_key in list(texts_possibly_new_in_tree):
 
3885
                    file_id, file_revision = file_key
 
3886
                    try:
 
3887
                        entry = basis_inv[file_id]
 
3888
                    except errors.NoSuchId:
 
3889
                        continue
 
3890
                    if entry.revision == file_revision:
 
3891
                        texts_possibly_new_in_tree.remove(file_key)
 
3892
            text_keys.update(texts_possibly_new_in_tree)
3622
3893
            pending_revisions.append(revision)
3623
3894
            cache[current_revision_id] = tree
3624
3895
            basis_id = current_revision_id
3626
3897
        from_texts = self.source.texts
3627
3898
        to_texts = self.target.texts
3628
3899
        if root_keys_to_create:
3629
 
            root_stream = self._new_root_data_stream(root_keys_to_create,
3630
 
                                                     parent_map)
 
3900
            root_stream = _mod_fetch._new_root_data_stream(
 
3901
                root_keys_to_create, self._revision_id_to_root_id, parent_map,
 
3902
                self.source, graph=a_graph)
3631
3903
            to_texts.insert_record_stream(root_stream)
3632
3904
        to_texts.insert_record_stream(from_texts.get_record_stream(
3633
3905
            text_keys, self.target._format._fetch_order,
3640
3912
            # for the new revisions that we are about to insert.  We do this
3641
3913
            # before adding the revisions so that no revision is added until
3642
3914
            # all the inventories it may depend on are added.
 
3915
            # Note that this is overzealous, as we may have fetched these in an
 
3916
            # earlier batch.
3643
3917
            parent_ids = set()
3644
3918
            revision_ids = set()
3645
3919
            for revision in pending_revisions:
3648
3922
            parent_ids.difference_update(revision_ids)
3649
3923
            parent_ids.discard(_mod_revision.NULL_REVISION)
3650
3924
            parent_map = self.source.get_parent_map(parent_ids)
3651
 
            for parent_tree in self.source.revision_trees(parent_ids):
3652
 
                basis_id, delta = self._get_delta_for_revision(tree, parent_ids, basis_id, cache)
 
3925
            # we iterate over parent_map and not parent_ids because we don't
 
3926
            # want to try copying any revision which is a ghost
 
3927
            for parent_tree in self.source.revision_trees(parent_map):
3653
3928
                current_revision_id = parent_tree.get_revision_id()
3654
3929
                parents_parents = parent_map[current_revision_id]
 
3930
                possible_trees = self._get_trees(parents_parents, cache)
 
3931
                if len(possible_trees) == 0:
 
3932
                    # There either aren't any parents, or the parents are
 
3933
                    # ghosts, so just use the last converted tree.
 
3934
                    possible_trees.append((basis_id, cache[basis_id]))
 
3935
                basis_id, delta = self._get_delta_for_revision(parent_tree,
 
3936
                    parents_parents, possible_trees)
3655
3937
                self.target.add_inventory_by_delta(
3656
3938
                    basis_id, delta, current_revision_id, parents_parents)
3657
3939
        # insert signatures and revisions
3671
3953
 
3672
3954
        :param revision_ids: The list of revisions to fetch. Must be in
3673
3955
            topological order.
3674
 
        :param pb: A ProgressBar
 
3956
        :param pb: A ProgressTask
3675
3957
        :return: None
3676
3958
        """
3677
3959
        basis_id, basis_tree = self._get_basis(revision_ids[0])
3680
3962
        cache[basis_id] = basis_tree
3681
3963
        del basis_tree # We don't want to hang on to it here
3682
3964
        hints = []
 
3965
        if self._converting_to_rich_root and len(revision_ids) > 100:
 
3966
            a_graph = _mod_fetch._get_rich_root_heads_graph(self.source,
 
3967
                                                            revision_ids)
 
3968
        else:
 
3969
            a_graph = None
 
3970
 
3683
3971
        for offset in range(0, len(revision_ids), batch_size):
3684
3972
            self.target.start_write_group()
3685
3973
            try:
3686
3974
                pb.update('Transferring revisions', offset,
3687
3975
                          len(revision_ids))
3688
3976
                batch = revision_ids[offset:offset+batch_size]
3689
 
                basis_id = self._fetch_batch(batch, basis_id, cache)
 
3977
                basis_id = self._fetch_batch(batch, basis_id, cache,
 
3978
                                             a_graph=a_graph)
3690
3979
            except:
3691
3980
                self.target.abort_write_group()
3692
3981
                raise
3722
4011
        # Walk though all revisions; get inventory deltas, copy referenced
3723
4012
        # texts that delta references, insert the delta, revision and
3724
4013
        # signature.
3725
 
        first_rev = self.source.get_revision(revision_ids[0])
3726
4014
        if pb is None:
3727
4015
            my_pb = ui.ui_factory.nested_progress_bar()
3728
4016
            pb = my_pb
3805
4093
                                                  self.source_repo.is_shared())
3806
4094
        converted.lock_write()
3807
4095
        try:
3808
 
            self.step('Copying content into repository.')
 
4096
            self.step('Copying content')
3809
4097
            self.source_repo.copy_content_into(converted)
3810
4098
        finally:
3811
4099
            converted.unlock()
3812
 
        self.step('Deleting old repository content.')
 
4100
        self.step('Deleting old repository content')
3813
4101
        self.repo_dir.transport.delete_tree('repository.backup')
3814
 
        self.pb.note('repository converted')
 
4102
        ui.ui_factory.note('repository converted')
3815
4103
 
3816
4104
    def step(self, message):
3817
4105
        """Update the pb by a step."""
3851
4139
 
3852
4140
class _VersionedFileChecker(object):
3853
4141
 
3854
 
    def __init__(self, repository, text_key_references=None):
 
4142
    def __init__(self, repository, text_key_references=None, ancestors=None):
3855
4143
        self.repository = repository
3856
4144
        self.text_index = self.repository._generate_text_key_index(
3857
 
            text_key_references=text_key_references)
 
4145
            text_key_references=text_key_references, ancestors=ancestors)
3858
4146
 
3859
4147
    def calculate_file_version_parents(self, text_key):
3860
4148
        """Calculate the correct parents for a file version according to
3878
4166
            revision_id) tuples for versions that are present in this versioned
3879
4167
            file, but not used by the corresponding inventory.
3880
4168
        """
 
4169
        local_progress = None
 
4170
        if progress_bar is None:
 
4171
            local_progress = ui.ui_factory.nested_progress_bar()
 
4172
            progress_bar = local_progress
 
4173
        try:
 
4174
            return self._check_file_version_parents(texts, progress_bar)
 
4175
        finally:
 
4176
            if local_progress:
 
4177
                local_progress.finished()
 
4178
 
 
4179
    def _check_file_version_parents(self, texts, progress_bar):
 
4180
        """See check_file_version_parents."""
3881
4181
        wrong_parents = {}
3882
4182
        self.file_ids = set([file_id for file_id, _ in
3883
4183
            self.text_index.iterkeys()])
3884
4184
        # text keys is now grouped by file_id
3885
 
        n_weaves = len(self.file_ids)
3886
 
        files_in_revisions = {}
3887
 
        revisions_of_files = {}
3888
4185
        n_versions = len(self.text_index)
3889
4186
        progress_bar.update('loading text store', 0, n_versions)
3890
4187
        parent_map = self.repository.texts.get_parent_map(self.text_index)
3892
4189
        text_keys = self.repository.texts.keys()
3893
4190
        unused_keys = frozenset(text_keys) - set(self.text_index)
3894
4191
        for num, key in enumerate(self.text_index.iterkeys()):
3895
 
            if progress_bar is not None:
3896
 
                progress_bar.update('checking text graph', num, n_versions)
 
4192
            progress_bar.update('checking text graph', num, n_versions)
3897
4193
            correct_parents = self.calculate_file_version_parents(key)
3898
4194
            try:
3899
4195
                knit_parents = parent_map[key]
3984
4280
            else:
3985
4281
                new_pack.set_write_cache_size(1024*1024)
3986
4282
        for substream_type, substream in stream:
 
4283
            if 'stream' in debug.debug_flags:
 
4284
                mutter('inserting substream: %s', substream_type)
3987
4285
            if substream_type == 'texts':
3988
4286
                self.target_repo.texts.insert_record_stream(substream)
3989
4287
            elif substream_type == 'inventories':
3993
4291
                else:
3994
4292
                    self._extract_and_insert_inventories(
3995
4293
                        substream, src_serializer)
 
4294
            elif substream_type == 'inventory-deltas':
 
4295
                self._extract_and_insert_inventory_deltas(
 
4296
                    substream, src_serializer)
3996
4297
            elif substream_type == 'chk_bytes':
3997
4298
                # XXX: This doesn't support conversions, as it assumes the
3998
4299
                #      conversion was done in the fetch code.
4029
4330
                ):
4030
4331
                if versioned_file is None:
4031
4332
                    continue
 
4333
                # TODO: key is often going to be a StaticTuple object
 
4334
                #       I don't believe we can define a method by which
 
4335
                #       (prefix,) + StaticTuple will work, though we could
 
4336
                #       define a StaticTuple.sq_concat that would allow you to
 
4337
                #       pass in either a tuple or a StaticTuple as the second
 
4338
                #       object, so instead we could have:
 
4339
                #       StaticTuple(prefix) + key here...
4032
4340
                missing_keys.update((prefix,) + key for key in
4033
4341
                    versioned_file.get_missing_compression_parent_keys())
4034
4342
        except NotImplementedError:
4049
4357
            self.target_repo.pack(hint=hint)
4050
4358
        return [], set()
4051
4359
 
4052
 
    def _extract_and_insert_inventories(self, substream, serializer):
 
4360
    def _extract_and_insert_inventory_deltas(self, substream, serializer):
 
4361
        target_rich_root = self.target_repo._format.rich_root_data
 
4362
        target_tree_refs = self.target_repo._format.supports_tree_reference
 
4363
        for record in substream:
 
4364
            # Insert the delta directly
 
4365
            inventory_delta_bytes = record.get_bytes_as('fulltext')
 
4366
            deserialiser = inventory_delta.InventoryDeltaDeserializer()
 
4367
            try:
 
4368
                parse_result = deserialiser.parse_text_bytes(
 
4369
                    inventory_delta_bytes)
 
4370
            except inventory_delta.IncompatibleInventoryDelta, err:
 
4371
                trace.mutter("Incompatible delta: %s", err.msg)
 
4372
                raise errors.IncompatibleRevision(self.target_repo._format)
 
4373
            basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
 
4374
            revision_id = new_id
 
4375
            parents = [key[0] for key in record.parents]
 
4376
            self.target_repo.add_inventory_by_delta(
 
4377
                basis_id, inv_delta, revision_id, parents)
 
4378
 
 
4379
    def _extract_and_insert_inventories(self, substream, serializer,
 
4380
            parse_delta=None):
4053
4381
        """Generate a new inventory versionedfile in target, converting data.
4054
4382
 
4055
4383
        The inventory is retrieved from the source, (deserializing it), and
4056
4384
        stored in the target (reserializing it in a different format).
4057
4385
        """
 
4386
        target_rich_root = self.target_repo._format.rich_root_data
 
4387
        target_tree_refs = self.target_repo._format.supports_tree_reference
4058
4388
        for record in substream:
 
4389
            # It's not a delta, so it must be a fulltext in the source
 
4390
            # serializer's format.
4059
4391
            bytes = record.get_bytes_as('fulltext')
4060
4392
            revision_id = record.key[0]
4061
4393
            inv = serializer.read_inventory_from_string(bytes, revision_id)
4062
4394
            parents = [key[0] for key in record.parents]
4063
4395
            self.target_repo.add_inventory(revision_id, inv, parents)
 
4396
            # No need to keep holding this full inv in memory when the rest of
 
4397
            # the substream is likely to be all deltas.
 
4398
            del inv
4064
4399
 
4065
4400
    def _extract_and_insert_revisions(self, substream, serializer):
4066
4401
        for record in substream:
4115
4450
        return [('signatures', signatures), ('revisions', revisions)]
4116
4451
 
4117
4452
    def _generate_root_texts(self, revs):
4118
 
        """This will be called by __fetch between fetching weave texts and
 
4453
        """This will be called by get_stream between fetching weave texts and
4119
4454
        fetching the inventory weave.
4120
 
 
4121
 
        Subclasses should override this if they need to generate root texts
4122
 
        after fetching weave texts.
4123
4455
        """
4124
4456
        if self._rich_root_upgrade():
4125
 
            import bzrlib.fetch
4126
 
            return bzrlib.fetch.Inter1and2Helper(
 
4457
            return _mod_fetch.Inter1and2Helper(
4127
4458
                self.from_repository).generate_root_texts(revs)
4128
4459
        else:
4129
4460
            return []
4132
4463
        phase = 'file'
4133
4464
        revs = search.get_keys()
4134
4465
        graph = self.from_repository.get_graph()
4135
 
        revs = list(graph.iter_topo_order(revs))
 
4466
        revs = tsort.topo_sort(graph.get_parent_map(revs))
4136
4467
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
4137
4468
        text_keys = []
4138
4469
        for knit_kind, file_id, revisions in data_to_fetch:
4157
4488
                # will be valid.
4158
4489
                for _ in self._generate_root_texts(revs):
4159
4490
                    yield _
4160
 
                # NB: This currently reopens the inventory weave in source;
4161
 
                # using a single stream interface instead would avoid this.
4162
 
                from_weave = self.from_repository.inventories
4163
4491
                # we fetch only the referenced inventories because we do not
4164
4492
                # know for unselected inventories whether all their required
4165
4493
                # texts are present in the other repository - it could be
4204
4532
            if not keys:
4205
4533
                # No need to stream something we don't have
4206
4534
                continue
 
4535
            if substream_kind == 'inventories':
 
4536
                # Some missing keys are genuinely ghosts, filter those out.
 
4537
                present = self.from_repository.inventories.get_parent_map(keys)
 
4538
                revs = [key[0] for key in present]
 
4539
                # Get the inventory stream more-or-less as we do for the
 
4540
                # original stream; there's no reason to assume that records
 
4541
                # direct from the source will be suitable for the sink.  (Think
 
4542
                # e.g. 2a -> 1.9-rich-root).
 
4543
                for info in self._get_inventory_stream(revs, missing=True):
 
4544
                    yield info
 
4545
                continue
 
4546
 
4207
4547
            # Ask for full texts always so that we don't need more round trips
4208
4548
            # after this stream.
4209
4549
            # Some of the missing keys are genuinely ghosts, so filter absent
4224
4564
        return (not self.from_repository._format.rich_root_data and
4225
4565
            self.to_format.rich_root_data)
4226
4566
 
4227
 
    def _get_inventory_stream(self, revision_ids):
 
4567
    def _get_inventory_stream(self, revision_ids, missing=False):
4228
4568
        from_format = self.from_repository._format
4229
 
        if (from_format.supports_chks and self.to_format.supports_chks
4230
 
            and (from_format._serializer == self.to_format._serializer)):
4231
 
            # Both sides support chks, and they use the same serializer, so it
4232
 
            # is safe to transmit the chk pages and inventory pages across
4233
 
            # as-is.
4234
 
            return self._get_chk_inventory_stream(revision_ids)
4235
 
        elif (not from_format.supports_chks):
4236
 
            # Source repository doesn't support chks. So we can transmit the
4237
 
            # inventories 'as-is' and either they are just accepted on the
4238
 
            # target, or the Sink will properly convert it.
4239
 
            return self._get_simple_inventory_stream(revision_ids)
 
4569
        if (from_format.supports_chks and self.to_format.supports_chks and
 
4570
            from_format.network_name() == self.to_format.network_name()):
 
4571
            raise AssertionError(
 
4572
                "this case should be handled by GroupCHKStreamSource")
 
4573
        elif 'forceinvdeltas' in debug.debug_flags:
 
4574
            return self._get_convertable_inventory_stream(revision_ids,
 
4575
                    delta_versus_null=missing)
 
4576
        elif from_format.network_name() == self.to_format.network_name():
 
4577
            # Same format.
 
4578
            return self._get_simple_inventory_stream(revision_ids,
 
4579
                    missing=missing)
 
4580
        elif (not from_format.supports_chks and not self.to_format.supports_chks
 
4581
                and from_format._serializer == self.to_format._serializer):
 
4582
            # Essentially the same format.
 
4583
            return self._get_simple_inventory_stream(revision_ids,
 
4584
                    missing=missing)
4240
4585
        else:
4241
 
            # XXX: Hack to make not-chk->chk fetch: copy the inventories as
4242
 
            #      inventories. Note that this should probably be done somehow
4243
 
            #      as part of bzrlib.repository.StreamSink. Except JAM couldn't
4244
 
            #      figure out how a non-chk repository could possibly handle
4245
 
            #      deserializing an inventory stream from a chk repo, as it
4246
 
            #      doesn't have a way to understand individual pages.
4247
 
            return self._get_convertable_inventory_stream(revision_ids)
 
4586
            # Any time we switch serializations, we want to use an
 
4587
            # inventory-delta based approach.
 
4588
            return self._get_convertable_inventory_stream(revision_ids,
 
4589
                    delta_versus_null=missing)
4248
4590
 
4249
 
    def _get_simple_inventory_stream(self, revision_ids):
 
4591
    def _get_simple_inventory_stream(self, revision_ids, missing=False):
 
4592
        # NB: This currently reopens the inventory weave in source;
 
4593
        # using a single stream interface instead would avoid this.
4250
4594
        from_weave = self.from_repository.inventories
 
4595
        if missing:
 
4596
            delta_closure = True
 
4597
        else:
 
4598
            delta_closure = not self.delta_on_metadata()
4251
4599
        yield ('inventories', from_weave.get_record_stream(
4252
4600
            [(rev_id,) for rev_id in revision_ids],
4253
 
            self.inventory_fetch_order(),
4254
 
            not self.delta_on_metadata()))
4255
 
 
4256
 
    def _get_chk_inventory_stream(self, revision_ids):
4257
 
        """Fetch the inventory texts, along with the associated chk maps."""
4258
 
        # We want an inventory outside of the search set, so that we can filter
4259
 
        # out uninteresting chk pages. For now we use
4260
 
        # _find_revision_outside_set, but if we had a Search with cut_revs, we
4261
 
        # could use that instead.
4262
 
        start_rev_id = self.from_repository._find_revision_outside_set(
4263
 
                            revision_ids)
4264
 
        start_rev_key = (start_rev_id,)
4265
 
        inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
4266
 
        if start_rev_id != _mod_revision.NULL_REVISION:
4267
 
            inv_keys_to_fetch.append((start_rev_id,))
4268
 
        # Any repo that supports chk_bytes must also support out-of-order
4269
 
        # insertion. At least, that is how we expect it to work
4270
 
        # We use get_record_stream instead of iter_inventories because we want
4271
 
        # to be able to insert the stream as well. We could instead fetch
4272
 
        # allowing deltas, and then iter_inventories, but we don't know whether
4273
 
        # source or target is more 'local' anway.
4274
 
        inv_stream = self.from_repository.inventories.get_record_stream(
4275
 
            inv_keys_to_fetch, 'unordered',
4276
 
            True) # We need them as full-texts so we can find their references
4277
 
        uninteresting_chk_roots = set()
4278
 
        interesting_chk_roots = set()
4279
 
        def filter_inv_stream(inv_stream):
4280
 
            for idx, record in enumerate(inv_stream):
4281
 
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
4282
 
                bytes = record.get_bytes_as('fulltext')
4283
 
                chk_inv = inventory.CHKInventory.deserialise(
4284
 
                    self.from_repository.chk_bytes, bytes, record.key)
4285
 
                if record.key == start_rev_key:
4286
 
                    uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
4287
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4288
 
                    if p_id_map is not None:
4289
 
                        uninteresting_chk_roots.add(p_id_map.key())
4290
 
                else:
4291
 
                    yield record
4292
 
                    interesting_chk_roots.add(chk_inv.id_to_entry.key())
4293
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4294
 
                    if p_id_map is not None:
4295
 
                        interesting_chk_roots.add(p_id_map.key())
4296
 
        ### pb.update('fetch inventory', 0, 2)
4297
 
        yield ('inventories', filter_inv_stream(inv_stream))
4298
 
        # Now that we have worked out all of the interesting root nodes, grab
4299
 
        # all of the interesting pages and insert them
4300
 
        ### pb.update('fetch inventory', 1, 2)
4301
 
        interesting = chk_map.iter_interesting_nodes(
4302
 
            self.from_repository.chk_bytes, interesting_chk_roots,
4303
 
            uninteresting_chk_roots)
4304
 
        def to_stream_adapter():
4305
 
            """Adapt the iter_interesting_nodes result to a single stream.
4306
 
 
4307
 
            iter_interesting_nodes returns records as it processes them, along
4308
 
            with keys. However, we only want to return the records themselves.
4309
 
            """
4310
 
            for record, items in interesting:
4311
 
                if record is not None:
4312
 
                    yield record
4313
 
        # XXX: We could instead call get_record_stream(records.keys())
4314
 
        #      ATM, this will always insert the records as fulltexts, and
4315
 
        #      requires that you can hang on to records once you have gone
4316
 
        #      on to the next one. Further, it causes the target to
4317
 
        #      recompress the data. Testing shows it to be faster than
4318
 
        #      requesting the records again, though.
4319
 
        yield ('chk_bytes', to_stream_adapter())
4320
 
        ### pb.update('fetch inventory', 2, 2)
4321
 
 
4322
 
    def _get_convertable_inventory_stream(self, revision_ids):
4323
 
        # XXX: One of source or target is using chks, and they don't have
4324
 
        #      compatible serializations. The StreamSink code expects to be
4325
 
        #      able to convert on the target, so we need to put
4326
 
        #      bytes-on-the-wire that can be converted
4327
 
        yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
4328
 
 
4329
 
    def _stream_invs_as_fulltexts(self, revision_ids):
 
4601
            self.inventory_fetch_order(), delta_closure))
 
4602
 
 
4603
    def _get_convertable_inventory_stream(self, revision_ids,
 
4604
                                          delta_versus_null=False):
 
4605
        # The source is using CHKs, but the target either doesn't or it has a
 
4606
        # different serializer.  The StreamSink code expects to be able to
 
4607
        # convert on the target, so we need to put bytes-on-the-wire that can
 
4608
        # be converted.  That means inventory deltas (if the remote is <1.19,
 
4609
        # RemoteStreamSink will fallback to VFS to insert the deltas).
 
4610
        yield ('inventory-deltas',
 
4611
           self._stream_invs_as_deltas(revision_ids,
 
4612
                                       delta_versus_null=delta_versus_null))
 
4613
 
 
4614
    def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
 
4615
        """Return a stream of inventory-deltas for the given rev ids.
 
4616
 
 
4617
        :param revision_ids: The list of inventories to transmit
 
4618
        :param delta_versus_null: Don't try to find a minimal delta for this
 
4619
            entry, instead compute the delta versus the NULL_REVISION. This
 
4620
            effectively streams a complete inventory. Used for stuff like
 
4621
            filling in missing parents, etc.
 
4622
        """
4330
4623
        from_repo = self.from_repository
4331
 
        from_serializer = from_repo._format._serializer
4332
4624
        revision_keys = [(rev_id,) for rev_id in revision_ids]
4333
4625
        parent_map = from_repo.inventories.get_parent_map(revision_keys)
4334
 
        for inv in self.from_repository.iter_inventories(revision_ids):
4335
 
            # XXX: This is a bit hackish, but it works. Basically,
4336
 
            #      CHKSerializer 'accidentally' supports
4337
 
            #      read/write_inventory_to_string, even though that is never
4338
 
            #      the format that is stored on disk. It *does* give us a
4339
 
            #      single string representation for an inventory, so live with
4340
 
            #      it for now.
4341
 
            #      This would be far better if we had a 'serialized inventory
4342
 
            #      delta' form. Then we could use 'inventory._make_delta', and
4343
 
            #      transmit that. This would both be faster to generate, and
4344
 
            #      result in fewer bytes-on-the-wire.
4345
 
            as_bytes = from_serializer.write_inventory_to_string(inv)
 
4626
        # XXX: possibly repos could implement a more efficient iter_inv_deltas
 
4627
        # method...
 
4628
        inventories = self.from_repository.iter_inventories(
 
4629
            revision_ids, 'topological')
 
4630
        format = from_repo._format
 
4631
        invs_sent_so_far = set([_mod_revision.NULL_REVISION])
 
4632
        inventory_cache = lru_cache.LRUCache(50)
 
4633
        null_inventory = from_repo.revision_tree(
 
4634
            _mod_revision.NULL_REVISION).inventory
 
4635
        # XXX: ideally the rich-root/tree-refs flags would be per-revision, not
 
4636
        # per-repo (e.g.  streaming a non-rich-root revision out of a rich-root
 
4637
        # repo back into a non-rich-root repo ought to be allowed)
 
4638
        serializer = inventory_delta.InventoryDeltaSerializer(
 
4639
            versioned_root=format.rich_root_data,
 
4640
            tree_references=format.supports_tree_reference)
 
4641
        for inv in inventories:
4346
4642
            key = (inv.revision_id,)
4347
4643
            parent_keys = parent_map.get(key, ())
 
4644
            delta = None
 
4645
            if not delta_versus_null and parent_keys:
 
4646
                # The caller did not ask for complete inventories and we have
 
4647
                # some parents that we can delta against.  Make a delta against
 
4648
                # each parent so that we can find the smallest.
 
4649
                parent_ids = [parent_key[0] for parent_key in parent_keys]
 
4650
                for parent_id in parent_ids:
 
4651
                    if parent_id not in invs_sent_so_far:
 
4652
                        # We don't know that the remote side has this basis, so
 
4653
                        # we can't use it.
 
4654
                        continue
 
4655
                    if parent_id == _mod_revision.NULL_REVISION:
 
4656
                        parent_inv = null_inventory
 
4657
                    else:
 
4658
                        parent_inv = inventory_cache.get(parent_id, None)
 
4659
                        if parent_inv is None:
 
4660
                            parent_inv = from_repo.get_inventory(parent_id)
 
4661
                    candidate_delta = inv._make_delta(parent_inv)
 
4662
                    if (delta is None or
 
4663
                        len(delta) > len(candidate_delta)):
 
4664
                        delta = candidate_delta
 
4665
                        basis_id = parent_id
 
4666
            if delta is None:
 
4667
                # Either none of the parents ended up being suitable, or we
 
4668
                # were asked to delta against NULL
 
4669
                basis_id = _mod_revision.NULL_REVISION
 
4670
                delta = inv._make_delta(null_inventory)
 
4671
            invs_sent_so_far.add(inv.revision_id)
 
4672
            inventory_cache[inv.revision_id] = inv
 
4673
            delta_serialized = ''.join(
 
4674
                serializer.delta_to_lines(basis_id, key[-1], delta))
4348
4675
            yield versionedfile.FulltextContentFactory(
4349
 
                key, parent_keys, None, as_bytes)
 
4676
                key, parent_keys, None, delta_serialized)
4350
4677
 
4351
4678
 
4352
4679
def _iter_for_revno(repo, partial_history_cache, stop_index=None,