/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-12 22:51:31 UTC
  • mto: This revision was merged to the branch mainline in revision 4955.
  • Revision ID: john@arbash-meinel.com-20100112225131-he8h411p6aeeb947
Delay grabbing an output stream until we actually go to show a diff.

This makes the test suite happy, but it also seems to be reasonable.
If we aren't going to write anything, we don't need to hold an
output stream open.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
"""Tests for foreign VCS utility code."""
 
19
 
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    bzrdir,
 
24
    errors,
 
25
    foreign,
 
26
    lockable_files,
 
27
    lockdir,
 
28
    revision,
 
29
    tests,
 
30
    trace,
 
31
    )
 
32
 
 
33
# This is the dummy foreign revision control system, used 
 
34
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
35
# It is basically standard Bazaar with some minor modifications to 
 
36
# make it "foreign". 
 
37
 
38
# It has the following differences to "regular" Bazaar:
 
39
# - The control directory is named ".dummy", not ".bzr".
 
40
# - The revision ids are tuples, not strings.
 
41
# - Doesn't support more than one parent natively
 
42
 
 
43
 
 
44
class DummyForeignVcsMapping(foreign.VcsMapping):
 
45
    """A simple mapping for the dummy Foreign VCS, for use with testing."""
 
46
 
 
47
    def __eq__(self, other):
 
48
        return type(self) == type(other)
 
49
 
 
50
    def revision_id_bzr_to_foreign(self, bzr_revid):
 
51
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self
 
52
 
 
53
    def revision_id_foreign_to_bzr(self, foreign_revid):
 
54
        return "dummy-v1:%s-%s-%s" % foreign_revid
 
55
 
 
56
 
 
57
class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):
 
58
 
 
59
    def revision_id_bzr_to_foreign(self, revid):
 
60
        if not revid.startswith("dummy-"):
 
61
            raise errors.InvalidRevisionId(revid, None)
 
62
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
 
63
        mapping = self.get(mapping_version)
 
64
        return mapping.revision_id_bzr_to_foreign(revid)
 
65
 
 
66
 
 
67
class DummyForeignVcs(foreign.ForeignVcs):
 
68
    """A dummy Foreign VCS, for use with testing.
 
69
 
 
70
    It has revision ids that are a tuple with three strings.
 
71
    """
 
72
 
 
73
    def __init__(self):
 
74
        self.mapping_registry = DummyForeignVcsMappingRegistry()
 
75
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
 
76
                                       "Version 1")
 
77
        self.abbreviation = "dummy"
 
78
 
 
79
    def show_foreign_revid(self, foreign_revid):
 
80
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
 
81
 
 
82
    def serialize_foreign_revid(self, foreign_revid):
 
83
        return "%s|%s|%s" % foreign_revid
 
84
 
 
85
 
 
86
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
87
    """A Dummy VCS Branch."""
 
88
 
 
89
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
90
        self._format = _format
 
91
        self._base = a_bzrdir.transport.base
 
92
        self._ignore_fallbacks = False
 
93
        foreign.ForeignBranch.__init__(self, 
 
94
            DummyForeignVcsMapping(DummyForeignVcs()))
 
95
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
 
96
            *args, **kwargs)
 
97
 
 
98
 
 
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
100
                            foreign.InterToForeignBranch):
 
101
 
 
102
    @staticmethod
 
103
    def is_compatible(source, target):
 
104
        return isinstance(target, DummyForeignVcsBranch)
 
105
 
 
106
    def push(self, overwrite=False, stop_revision=None):
 
107
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
 
 
109
    def lossy_push(self, stop_revision=None):
 
110
        result = branch.BranchPushResult()
 
111
        result.source_branch = self.source
 
112
        result.target_branch = self.target
 
113
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
114
        self.source.lock_read()
 
115
        try:
 
116
            # This just handles simple cases, but that's good enough for tests
 
117
            my_history = self.target.revision_history()
 
118
            their_history = self.source.revision_history()
 
119
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
120
                raise errors.DivergedBranches(self.target, self.source)
 
121
            todo = their_history[len(my_history):]
 
122
            revidmap = {}
 
123
            for revid in todo:
 
124
                rev = self.source.repository.get_revision(revid)
 
125
                tree = self.source.repository.revision_tree(revid)
 
126
                def get_file_with_stat(file_id, path=None):
 
127
                    return (tree.get_file(file_id), None)
 
128
                tree.get_file_with_stat = get_file_with_stat
 
129
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
130
                    (str(rev.timestamp), str(rev.timezone), 
 
131
                        str(self.target.revno())))
 
132
                parent_revno, parent_revid= self.target.last_revision_info()
 
133
                if parent_revid == revision.NULL_REVISION:
 
134
                    parent_revids = []
 
135
                else:
 
136
                    parent_revids = [parent_revid]
 
137
                builder = self.target.get_commit_builder(parent_revids, 
 
138
                        self.target.get_config(), rev.timestamp,
 
139
                        rev.timezone, rev.committer, rev.properties,
 
140
                        new_revid)
 
141
                try:
 
142
                    for path, ie in tree.inventory.iter_entries():
 
143
                        new_ie = ie.copy()
 
144
                        new_ie.revision = None
 
145
                        builder.record_entry_contents(new_ie, 
 
146
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
147
                            path, tree, 
 
148
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
149
                    builder.finish_inventory()
 
150
                except:
 
151
                    builder.abort()
 
152
                    raise
 
153
                revidmap[revid] = builder.commit(rev.message)
 
154
                self.target.set_last_revision_info(parent_revno+1, 
 
155
                    revidmap[revid])
 
156
                trace.mutter('lossily pushed revision %s -> %s', 
 
157
                    revid, revidmap[revid])
 
158
        finally:
 
159
            self.source.unlock()
 
160
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
161
        result.revidmap = revidmap
 
162
        return result
 
163
 
 
164
 
 
165
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
166
 
 
167
    def get_format_string(self):
 
168
        return "Branch for Testing"
 
169
 
 
170
    def __init__(self):
 
171
        super(DummyForeignVcsBranchFormat, self).__init__()
 
172
        self._matchingbzrdir = DummyForeignVcsDirFormat()
 
173
 
 
174
    def open(self, a_bzrdir, _found=False):
 
175
        if not _found:
 
176
            raise NotImplementedError
 
177
        try:
 
178
            transport = a_bzrdir.get_branch_transport(None)
 
179
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
180
                                                         lockdir.LockDir)
 
181
            return DummyForeignVcsBranch(_format=self,
 
182
                              _control_files=control_files,
 
183
                              a_bzrdir=a_bzrdir,
 
184
                              _repository=a_bzrdir.find_repository())
 
185
        except errors.NoSuchFile:
 
186
            raise errors.NotBranchError(path=transport.base)
 
187
 
 
188
 
 
189
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
 
190
    """BzrDirFormat for the dummy foreign VCS."""
 
191
 
 
192
    @classmethod
 
193
    def get_format_string(cls):
 
194
        return "A Dummy VCS Dir"
 
195
 
 
196
    @classmethod
 
197
    def get_format_description(cls):
 
198
        return "A Dummy VCS Dir"
 
199
 
 
200
    @classmethod
 
201
    def is_supported(cls):
 
202
        return True
 
203
 
 
204
    def get_branch_format(self):
 
205
        return DummyForeignVcsBranchFormat()
 
206
 
 
207
    @classmethod
 
208
    def probe_transport(klass, transport):
 
209
        """Return the .bzrdir style format present in a directory."""
 
210
        if not transport.has('.dummy'):
 
211
            raise errors.NotBranchError(path=transport.base)
 
212
        return klass()
 
213
 
 
214
    def initialize_on_transport(self, transport):
 
215
        """Initialize a new bzrdir in the base directory of a Transport."""
 
216
        # Since we don't have a .bzr directory, inherit the
 
217
        # mode from the root directory
 
218
        temp_control = lockable_files.LockableFiles(transport,
 
219
                            '', lockable_files.TransportLock)
 
220
        temp_control._transport.mkdir('.dummy',
 
221
                                      # FIXME: RBC 20060121 don't peek under
 
222
                                      # the covers
 
223
                                      mode=temp_control._dir_mode)
 
224
        del temp_control
 
225
        bzrdir_transport = transport.clone('.dummy')
 
226
        # NB: no need to escape relative paths that are url safe.
 
227
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
228
            self._lock_file_name, self._lock_class)
 
229
        control_files.create_lock()
 
230
        return self.open(transport, _found=True)
 
231
 
 
232
    def _open(self, transport):
 
233
        return DummyForeignVcsDir(transport, self)
 
234
 
 
235
 
 
236
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
 
237
 
 
238
    def __init__(self, _transport, _format):
 
239
        self._format = _format
 
240
        self.transport = _transport.clone('.dummy')
 
241
        self.root_transport = _transport
 
242
        self._mode_check_done = False
 
243
        self._control_files = lockable_files.LockableFiles(self.transport,
 
244
            "lock", lockable_files.TransportLock)
 
245
 
 
246
    def open_branch(self, ignore_fallbacks=True):
 
247
        return self._format.get_branch_format().open(self, _found=True)
 
248
 
 
249
    def cloning_metadir(self, stacked=False):
 
250
        """Produce a metadir suitable for cloning with."""
 
251
        return bzrdir.format_registry.make_bzrdir("default")
 
252
 
 
253
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
254
               recurse='down', possible_transports=None,
 
255
               accelerator_tree=None, hardlink=False, stacked=False,
 
256
               source_branch=None):
 
257
        # dirstate doesn't cope with accelerator_trees well 
 
258
        # that have a different control dir
 
259
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
260
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
261
                recurse=recurse, possible_transports=possible_transports, 
 
262
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
263
 
 
264
 
 
265
def register_dummy_foreign_for_test(testcase):
 
266
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
267
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
 
268
                        DummyForeignVcsDirFormat)
 
269
    # We need to register the optimiser to make the dummy appears really
 
270
    # different from a regular bzr repository.
 
271
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
272
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
 
273
                        InterToDummyVcsBranch)
 
274
 
 
275
 
 
276
class ForeignVcsRegistryTests(tests.TestCase):
 
277
    """Tests for the ForeignVcsRegistry class."""
 
278
 
 
279
    def test_parse_revision_id_no_dash(self):
 
280
        reg = foreign.ForeignVcsRegistry()
 
281
        self.assertRaises(errors.InvalidRevisionId,
 
282
                          reg.parse_revision_id, "invalid")
 
283
 
 
284
    def test_parse_revision_id_unknown_mapping(self):
 
285
        reg = foreign.ForeignVcsRegistry()
 
286
        self.assertRaises(errors.InvalidRevisionId,
 
287
                          reg.parse_revision_id, "unknown-foreignrevid")
 
288
 
 
289
    def test_parse_revision_id(self):
 
290
        reg = foreign.ForeignVcsRegistry()
 
291
        vcs = DummyForeignVcs()
 
292
        reg.register("dummy", vcs, "Dummy VCS")
 
293
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
294
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
295
 
 
296
 
 
297
class ForeignRevisionTests(tests.TestCase):
 
298
    """Tests for the ForeignRevision class."""
 
299
 
 
300
    def test_create(self):
 
301
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
 
302
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
 
303
                                      mapp, "roundtripped-revid")
 
304
        self.assertEquals("", rev.inventory_sha1)
 
305
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
 
306
        self.assertEquals(mapp, rev.mapping)
 
307
 
 
308
 
 
309
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
 
310
    """Tests for update_workingtree_fileids()."""
 
311
 
 
312
    def test_update_workingtree(self):
 
313
        wt = self.make_branch_and_tree('br1')
 
314
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
315
        wt.add('bla', 'bla-a')
 
316
        wt.commit('bla-a')
 
317
        root_id = wt.get_root_id()
 
318
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
319
        target.unversion(['bla-a'])
 
320
        target.add('bla', 'bla-b')
 
321
        target.commit('bla-b')
 
322
        target_basis = target.basis_tree()
 
323
        target_basis.lock_read()
 
324
        self.addCleanup(target_basis.unlock)
 
325
        foreign.update_workingtree_fileids(wt, target_basis)
 
326
        wt.lock_read()
 
327
        try:
 
328
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
 
329
        finally:
 
330
            wt.unlock()
 
331
 
 
332
 
 
333
class DummyForeignVcsTests(tests.TestCaseWithTransport):
 
334
    """Very basic test for DummyForeignVcs."""
 
335
 
 
336
    def setUp(self):
 
337
        super(DummyForeignVcsTests, self).setUp()
 
338
        register_dummy_foreign_for_test(self)
 
339
 
 
340
    def test_create(self):
 
341
        """Test we can create dummies."""
 
342
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
343
        dir = bzrdir.BzrDir.open("d")
 
344
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
345
        dir.open_repository()
 
346
        dir.open_branch()
 
347
        dir.open_workingtree()
 
348
 
 
349
    def test_sprout(self):
 
350
        """Test we can clone dummies and that the format is not preserved."""
 
351
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
352
        dir = bzrdir.BzrDir.open("d")
 
353
        newdir = dir.sprout("e")
 
354
        self.assertNotEquals("A Dummy VCS Dir",
 
355
                             newdir._format.get_format_string())
 
356
 
 
357
    def test_push_not_supported(self):
 
358
        source_tree = self.make_branch_and_tree("source")
 
359
        target_tree = self.make_branch_and_tree("target", 
 
360
            format=DummyForeignVcsDirFormat())
 
361
        self.assertRaises(errors.NoRoundtrippingSupport, 
 
362
            source_tree.branch.push, target_tree.branch)
 
363
 
 
364
    def test_lossy_push_empty(self):
 
365
        source_tree = self.make_branch_and_tree("source")
 
366
        target_tree = self.make_branch_and_tree("target", 
 
367
            format=DummyForeignVcsDirFormat())
 
368
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
369
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
370
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
 
371
        self.assertEquals({}, pushresult.revidmap)
 
372
 
 
373
    def test_lossy_push_simple(self):
 
374
        source_tree = self.make_branch_and_tree("source")
 
375
        self.build_tree(['source/a', 'source/b'])
 
376
        source_tree.add(['a', 'b'])
 
377
        revid1 = source_tree.commit("msg")
 
378
        target_tree = self.make_branch_and_tree("target", 
 
379
            format=DummyForeignVcsDirFormat())
 
380
        target_tree.branch.lock_write()
 
381
        try:
 
382
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
383
        finally:
 
384
            target_tree.branch.unlock()
 
385
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
386
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
387
                           pushresult.revidmap)
 
388
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)