/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Vincent Ladeuil
  • Date: 2012-01-18 14:09:19 UTC
  • mto: This revision was merged to the branch mainline in revision 6468.
  • Revision ID: v.ladeuil+lp@free.fr-20120118140919-rlvdrhpc0nq1lbwi
Change set/remove to require a lock for the branch config files.

This means that tests (or any plugin for that matter) do not requires an
explicit lock on the branch anymore to change a single option. This also
means the optimisation becomes "opt-in" and as such won't be as
spectacular as it may be and/or harder to get right (nothing fails
anymore).

This reduces the diff by ~300 lines.

Code/tests that were updating more than one config option is still taking
a lock to at least avoid some IOs and demonstrate the benefits through
the decreased number of hpss calls.

The duplication between BranchStack and BranchOnlyStack will be removed
once the same sharing is in place for local config files, at which point
the Stack class itself may be able to host the changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
"""Tests for foreign VCS utility code."""
 
19
 
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    bzrdir,
 
24
    controldir,
 
25
    errors,
 
26
    foreign,
 
27
    lockable_files,
 
28
    lockdir,
 
29
    repository,
 
30
    revision,
 
31
    tests,
 
32
    trace,
 
33
    vf_repository,
 
34
    )
 
35
 
 
36
from bzrlib.repofmt import groupcompress_repo
 
37
 
 
38
# This is the dummy foreign revision control system, used 
 
39
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
40
# It is basically standard Bazaar with some minor modifications to 
 
41
# make it "foreign". 
 
42
 
43
# It has the following differences to "regular" Bazaar:
 
44
# - The control directory is named ".dummy", not ".bzr".
 
45
# - The revision ids are tuples, not strings.
 
46
# - Doesn't support more than one parent natively
 
47
 
 
48
 
 
49
class DummyForeignVcsMapping(foreign.VcsMapping):
 
50
    """A simple mapping for the dummy Foreign VCS, for use with testing."""
 
51
 
 
52
    def __eq__(self, other):
 
53
        return type(self) == type(other)
 
54
 
 
55
    def revision_id_bzr_to_foreign(self, bzr_revid):
 
56
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self
 
57
 
 
58
    def revision_id_foreign_to_bzr(self, foreign_revid):
 
59
        return "dummy-v1:%s-%s-%s" % foreign_revid
 
60
 
 
61
 
 
62
class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):
 
63
 
 
64
    def revision_id_bzr_to_foreign(self, revid):
 
65
        if not revid.startswith("dummy-"):
 
66
            raise errors.InvalidRevisionId(revid, None)
 
67
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
 
68
        mapping = self.get(mapping_version)
 
69
        return mapping.revision_id_bzr_to_foreign(revid)
 
70
 
 
71
 
 
72
class DummyForeignVcs(foreign.ForeignVcs):
 
73
    """A dummy Foreign VCS, for use with testing.
 
74
 
 
75
    It has revision ids that are a tuple with three strings.
 
76
    """
 
77
 
 
78
    def __init__(self):
 
79
        self.mapping_registry = DummyForeignVcsMappingRegistry()
 
80
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
 
81
                                       "Version 1")
 
82
        self.abbreviation = "dummy"
 
83
 
 
84
    def show_foreign_revid(self, foreign_revid):
 
85
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
 
86
 
 
87
    def serialize_foreign_revid(self, foreign_revid):
 
88
        return "%s|%s|%s" % foreign_revid
 
89
 
 
90
 
 
91
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
92
    """A Dummy VCS Branch."""
 
93
 
 
94
    @property
 
95
    def user_transport(self):
 
96
        return self.bzrdir.user_transport
 
97
 
 
98
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
99
        self._format = _format
 
100
        self._base = a_bzrdir.transport.base
 
101
        self._ignore_fallbacks = False
 
102
        self.bzrdir = a_bzrdir
 
103
        foreign.ForeignBranch.__init__(self,
 
104
            DummyForeignVcsMapping(DummyForeignVcs()))
 
105
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
106
            *args, **kwargs)
 
107
 
 
108
    def _get_checkout_format(self, lightweight=False):
 
109
        """Return the most suitable metadir for a checkout of this branch.
 
110
        Weaves are used if this branch's repository uses weaves.
 
111
        """
 
112
        return self.bzrdir.checkout_metadir()
 
113
 
 
114
    def import_last_revision_info_and_tags(self, source, revno, revid,
 
115
                                           lossy=False):
 
116
        interbranch = InterToDummyVcsBranch(source, self)
 
117
        result = interbranch.push(stop_revision=revid, lossy=True)
 
118
        if lossy:
 
119
            revid = result.revidmap[revid]
 
120
        return (revno, revid)
 
121
 
 
122
 
 
123
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
 
124
 
 
125
    def _generate_revision_if_needed(self):
 
126
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
 
127
        if self._lossy:
 
128
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
 
129
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
 
130
            self.random_revid = False
 
131
        elif self._new_revision_id is not None:
 
132
            self.random_revid = False
 
133
        else:
 
134
            self._new_revision_id = self._gen_revision_id()
 
135
            self.random_revid = True
 
136
 
 
137
 
 
138
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
 
139
    foreign.ForeignRepository):
 
140
    """Dummy foreign vcs repository."""
 
141
 
 
142
 
 
143
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
 
144
 
 
145
    repository_class = DummyForeignVcsRepository
 
146
    _commit_builder_class = DummyForeignCommitBuilder
 
147
 
 
148
    @classmethod
 
149
    def get_format_string(cls):
 
150
        return "Dummy Foreign Vcs Repository"
 
151
 
 
152
    def get_format_description(self):
 
153
        return "Dummy Foreign Vcs Repository"
 
154
 
 
155
 
 
156
def branch_history(graph, revid):
 
157
    ret = list(graph.iter_lefthand_ancestry(revid,
 
158
        (revision.NULL_REVISION,)))
 
159
    ret.reverse()
 
160
    return ret
 
161
 
 
162
 
 
163
class InterToDummyVcsBranch(branch.GenericInterBranch):
 
164
 
 
165
    @staticmethod
 
166
    def is_compatible(source, target):
 
167
        return isinstance(target, DummyForeignVcsBranch)
 
168
 
 
169
    def push(self, overwrite=False, stop_revision=None, lossy=False):
 
170
        if not lossy:
 
171
            raise errors.NoRoundtrippingSupport(self.source, self.target)
 
172
        result = branch.BranchPushResult()
 
173
        result.source_branch = self.source
 
174
        result.target_branch = self.target
 
175
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
176
        self.source.lock_read()
 
177
        try:
 
178
            graph = self.source.repository.get_graph()
 
179
            # This just handles simple cases, but that's good enough for tests
 
180
            my_history = branch_history(self.target.repository.get_graph(),
 
181
                result.old_revid)
 
182
            if stop_revision is None:
 
183
                stop_revision = self.source.last_revision()
 
184
            their_history = branch_history(graph, stop_revision)
 
185
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
186
                raise errors.DivergedBranches(self.target, self.source)
 
187
            todo = their_history[len(my_history):]
 
188
            revidmap = {}
 
189
            for revid in todo:
 
190
                rev = self.source.repository.get_revision(revid)
 
191
                tree = self.source.repository.revision_tree(revid)
 
192
                def get_file_with_stat(file_id, path=None):
 
193
                    return (tree.get_file(file_id), None)
 
194
                tree.get_file_with_stat = get_file_with_stat
 
195
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
196
                    (str(rev.timestamp), str(rev.timezone),
 
197
                        str(self.target.revno())))
 
198
                parent_revno, parent_revid= self.target.last_revision_info()
 
199
                if parent_revid == revision.NULL_REVISION:
 
200
                    parent_revids = []
 
201
                else:
 
202
                    parent_revids = [parent_revid]
 
203
                builder = self.target.get_commit_builder(parent_revids, 
 
204
                        self.target.get_config_stack(), rev.timestamp,
 
205
                        rev.timezone, rev.committer, rev.properties,
 
206
                        new_revid)
 
207
                try:
 
208
                    for path, ie in tree.inventory.iter_entries():
 
209
                        new_ie = ie.copy()
 
210
                        new_ie.revision = None
 
211
                        builder.record_entry_contents(new_ie, 
 
212
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
213
                            path, tree, 
 
214
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
215
                    builder.finish_inventory()
 
216
                except:
 
217
                    builder.abort()
 
218
                    raise
 
219
                revidmap[revid] = builder.commit(rev.message)
 
220
                self.target.set_last_revision_info(parent_revno+1, 
 
221
                    revidmap[revid])
 
222
                trace.mutter('lossily pushed revision %s -> %s', 
 
223
                    revid, revidmap[revid])
 
224
        finally:
 
225
            self.source.unlock()
 
226
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
227
        result.revidmap = revidmap
 
228
        return result
 
229
 
 
230
 
 
231
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
232
 
 
233
    @classmethod
 
234
    def get_format_string(cls):
 
235
        return "Branch for Testing"
 
236
 
 
237
    @property
 
238
    def _matchingbzrdir(self):
 
239
        return DummyForeignVcsDirFormat()
 
240
 
 
241
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
 
242
            found_repository=None):
 
243
        if not _found:
 
244
            raise NotImplementedError
 
245
        try:
 
246
            transport = a_bzrdir.get_branch_transport(None, name=name)
 
247
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
248
                                                         lockdir.LockDir)
 
249
            if found_repository is None:
 
250
                found_repository = a_bzrdir.find_repository()
 
251
            return DummyForeignVcsBranch(_format=self,
 
252
                              _control_files=control_files,
 
253
                              a_bzrdir=a_bzrdir,
 
254
                              _repository=found_repository)
 
255
        except errors.NoSuchFile:
 
256
            raise errors.NotBranchError(path=transport.base)
 
257
 
 
258
 
 
259
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
 
260
    """BzrDirFormat for the dummy foreign VCS."""
 
261
 
 
262
    @classmethod
 
263
    def get_format_string(cls):
 
264
        return "A Dummy VCS Dir"
 
265
 
 
266
    @classmethod
 
267
    def get_format_description(cls):
 
268
        return "A Dummy VCS Dir"
 
269
 
 
270
    @classmethod
 
271
    def is_supported(cls):
 
272
        return True
 
273
 
 
274
    def get_branch_format(self):
 
275
        return DummyForeignVcsBranchFormat()
 
276
 
 
277
    @property
 
278
    def repository_format(self):
 
279
        return DummyForeignVcsRepositoryFormat()
 
280
 
 
281
    def initialize_on_transport(self, transport):
 
282
        """Initialize a new bzrdir in the base directory of a Transport."""
 
283
        # Since we don't have a .bzr directory, inherit the
 
284
        # mode from the root directory
 
285
        temp_control = lockable_files.LockableFiles(transport,
 
286
                            '', lockable_files.TransportLock)
 
287
        temp_control._transport.mkdir('.dummy',
 
288
                                      # FIXME: RBC 20060121 don't peek under
 
289
                                      # the covers
 
290
                                      mode=temp_control._dir_mode)
 
291
        del temp_control
 
292
        bzrdir_transport = transport.clone('.dummy')
 
293
        # NB: no need to escape relative paths that are url safe.
 
294
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
295
            self._lock_file_name, self._lock_class)
 
296
        control_files.create_lock()
 
297
        return self.open(transport, _found=True)
 
298
 
 
299
    def _open(self, transport):
 
300
        return DummyForeignVcsDir(transport, self)
 
301
 
 
302
 
 
303
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
 
304
 
 
305
    def __init__(self, _transport, _format):
 
306
        self._format = _format
 
307
        self.transport = _transport.clone('.dummy')
 
308
        self.root_transport = _transport
 
309
        self._mode_check_done = False
 
310
        self._control_files = lockable_files.LockableFiles(self.transport,
 
311
            "lock", lockable_files.TransportLock)
 
312
 
 
313
    def create_workingtree(self):
 
314
        # dirstate requires a ".bzr" entry to exist
 
315
        self.root_transport.put_bytes(".bzr", "foo")
 
316
        return super(DummyForeignVcsDir, self).create_workingtree()
 
317
 
 
318
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True,
 
319
            possible_transports=None):
 
320
        if name is not None:
 
321
            raise errors.NoColocatedBranchSupport(self)
 
322
        return self._format.get_branch_format().open(self, _found=True)
 
323
 
 
324
    def cloning_metadir(self, stacked=False):
 
325
        """Produce a metadir suitable for cloning with."""
 
326
        return bzrdir.format_registry.make_bzrdir("default")
 
327
 
 
328
    def checkout_metadir(self):
 
329
        return self.cloning_metadir()
 
330
 
 
331
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
332
               recurse='down', possible_transports=None,
 
333
               accelerator_tree=None, hardlink=False, stacked=False,
 
334
               source_branch=None):
 
335
        # dirstate doesn't cope with accelerator_trees well 
 
336
        # that have a different control dir
 
337
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
338
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
339
                recurse=recurse, possible_transports=possible_transports, 
 
340
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
341
 
 
342
 
 
343
def register_dummy_foreign_for_test(testcase):
 
344
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
 
345
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
 
346
        DummyForeignProber)
 
347
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
 
348
    testcase.addCleanup(repository.format_registry.remove,
 
349
            DummyForeignVcsRepositoryFormat())
 
350
    branch.format_registry.register(DummyForeignVcsBranchFormat())
 
351
    testcase.addCleanup(branch.format_registry.remove,
 
352
            DummyForeignVcsBranchFormat())
 
353
    # We need to register the optimiser to make the dummy appears really
 
354
    # different from a regular bzr repository.
 
355
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
356
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
 
357
                        InterToDummyVcsBranch)
 
358
 
 
359
 
 
360
class DummyForeignProber(controldir.Prober):
 
361
 
 
362
    @classmethod
 
363
    def probe_transport(klass, transport):
 
364
        """Return the .bzrdir style format present in a directory."""
 
365
        if not transport.has('.dummy'):
 
366
            raise errors.NotBranchError(path=transport.base)
 
367
        return DummyForeignVcsDirFormat()
 
368
 
 
369
    @classmethod
 
370
    def known_formats(cls):
 
371
        return set([DummyForeignVcsDirFormat()])
 
372
 
 
373
 
 
374
class ForeignVcsRegistryTests(tests.TestCase):
 
375
    """Tests for the ForeignVcsRegistry class."""
 
376
 
 
377
    def test_parse_revision_id_no_dash(self):
 
378
        reg = foreign.ForeignVcsRegistry()
 
379
        self.assertRaises(errors.InvalidRevisionId,
 
380
                          reg.parse_revision_id, "invalid")
 
381
 
 
382
    def test_parse_revision_id_unknown_mapping(self):
 
383
        reg = foreign.ForeignVcsRegistry()
 
384
        self.assertRaises(errors.InvalidRevisionId,
 
385
                          reg.parse_revision_id, "unknown-foreignrevid")
 
386
 
 
387
    def test_parse_revision_id(self):
 
388
        reg = foreign.ForeignVcsRegistry()
 
389
        vcs = DummyForeignVcs()
 
390
        reg.register("dummy", vcs, "Dummy VCS")
 
391
        self.assertEquals((
 
392
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
393
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
394
 
 
395
 
 
396
class ForeignRevisionTests(tests.TestCase):
 
397
    """Tests for the ForeignRevision class."""
 
398
 
 
399
    def test_create(self):
 
400
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
 
401
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
 
402
                                      mapp, "roundtripped-revid")
 
403
        self.assertEquals("", rev.inventory_sha1)
 
404
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
 
405
        self.assertEquals(mapp, rev.mapping)
 
406
 
 
407
 
 
408
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
 
409
    """Tests for update_workingtree_fileids()."""
 
410
 
 
411
    def test_update_workingtree(self):
 
412
        wt = self.make_branch_and_tree('br1')
 
413
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
414
        wt.add('bla', 'bla-a')
 
415
        wt.commit('bla-a')
 
416
        root_id = wt.get_root_id()
 
417
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
418
        target.unversion(['bla-a'])
 
419
        target.add('bla', 'bla-b')
 
420
        target.commit('bla-b')
 
421
        target_basis = target.basis_tree()
 
422
        target_basis.lock_read()
 
423
        self.addCleanup(target_basis.unlock)
 
424
        foreign.update_workingtree_fileids(wt, target_basis)
 
425
        wt.lock_read()
 
426
        try:
 
427
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
 
428
        finally:
 
429
            wt.unlock()
 
430
 
 
431
 
 
432
class DummyForeignVcsTests(tests.TestCaseWithTransport):
 
433
    """Very basic test for DummyForeignVcs."""
 
434
 
 
435
    def setUp(self):
 
436
        super(DummyForeignVcsTests, self).setUp()
 
437
        register_dummy_foreign_for_test(self)
 
438
 
 
439
    def test_create(self):
 
440
        """Test we can create dummies."""
 
441
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
442
        dir = bzrdir.BzrDir.open("d")
 
443
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
444
        dir.open_repository()
 
445
        dir.open_branch()
 
446
        dir.open_workingtree()
 
447
 
 
448
    def test_sprout(self):
 
449
        """Test we can clone dummies and that the format is not preserved."""
 
450
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
451
        dir = bzrdir.BzrDir.open("d")
 
452
        newdir = dir.sprout("e")
 
453
        self.assertNotEquals("A Dummy VCS Dir",
 
454
                             newdir._format.get_format_string())
 
455
 
 
456
    def test_push_not_supported(self):
 
457
        source_tree = self.make_branch_and_tree("source")
 
458
        target_tree = self.make_branch_and_tree("target", 
 
459
            format=DummyForeignVcsDirFormat())
 
460
        self.assertRaises(errors.NoRoundtrippingSupport, 
 
461
            source_tree.branch.push, target_tree.branch)
 
462
 
 
463
    def test_lossy_push_empty(self):
 
464
        source_tree = self.make_branch_and_tree("source")
 
465
        target_tree = self.make_branch_and_tree("target", 
 
466
            format=DummyForeignVcsDirFormat())
 
467
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
468
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
469
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
 
470
        self.assertEquals({}, pushresult.revidmap)
 
471
 
 
472
    def test_lossy_push_simple(self):
 
473
        source_tree = self.make_branch_and_tree("source")
 
474
        self.build_tree(['source/a', 'source/b'])
 
475
        source_tree.add(['a', 'b'])
 
476
        revid1 = source_tree.commit("msg")
 
477
        target_tree = self.make_branch_and_tree("target", 
 
478
            format=DummyForeignVcsDirFormat())
 
479
        target_tree.branch.lock_write()
 
480
        try:
 
481
            pushresult = source_tree.branch.push(
 
482
                target_tree.branch, lossy=True)
 
483
        finally:
 
484
            target_tree.branch.unlock()
 
485
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
486
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
487
                           pushresult.revidmap)
 
488
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)