/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: John Arbash Meinel
  • Date: 2009-06-18 18:18:36 UTC
  • mto: This revision was merged to the branch mainline in revision 4461.
  • Revision ID: john@arbash-meinel.com-20090618181836-biodfkat9a8eyzjz
The new add_inventory_by_delta is returning a CHKInventory when mapping from NULL
Which is completely valid, but 'broke' one of the tests.
So to fix it, changed the test to use CHKInventories on both sides, and add an __eq__
member. The nice thing is that CHKInventory.__eq__ is fairly cheap, since it only
has to check the root keys.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
"""Tests for foreign VCS utility code."""
 
19
 
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    errors,
 
24
    foreign,
 
25
    lockable_files,
 
26
    lockdir,
 
27
    trace,
 
28
    )
 
29
from bzrlib.bzrdir import (
 
30
    BzrDir,
 
31
    BzrDirFormat,
 
32
    BzrDirMeta1,
 
33
    BzrDirMetaFormat1,
 
34
    format_registry,
 
35
    )
 
36
from bzrlib.inventory import Inventory
 
37
from bzrlib.revision import (
 
38
    NULL_REVISION,
 
39
    Revision,
 
40
    )
 
41
from bzrlib.tests import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
 
45
 
 
46
# This is the dummy foreign revision control system, used 
 
47
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
48
# It is basically standard Bazaar with some minor modifications to 
 
49
# make it "foreign". 
 
50
 
51
# It has the following differences to "regular" Bazaar:
 
52
# - The control directory is named ".dummy", not ".bzr".
 
53
# - The revision ids are tuples, not strings.
 
54
# - Doesn't support more than one parent natively
 
55
 
 
56
 
 
57
class DummyForeignVcsMapping(foreign.VcsMapping):
 
58
    """A simple mapping for the dummy Foreign VCS, for use with testing."""
 
59
 
 
60
    def __eq__(self, other):
 
61
        return type(self) == type(other)
 
62
 
 
63
    def revision_id_bzr_to_foreign(self, bzr_revid):
 
64
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self
 
65
 
 
66
    def revision_id_foreign_to_bzr(self, foreign_revid):
 
67
        return "dummy-v1:%s-%s-%s" % foreign_revid
 
68
 
 
69
 
 
70
class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):
 
71
 
 
72
    def revision_id_bzr_to_foreign(self, revid):
 
73
        if not revid.startswith("dummy-"):
 
74
            raise errors.InvalidRevisionId(revid, None)
 
75
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
 
76
        mapping = self.get(mapping_version)
 
77
        return mapping.revision_id_bzr_to_foreign(revid)
 
78
 
 
79
 
 
80
class DummyForeignVcs(foreign.ForeignVcs):
 
81
    """A dummy Foreign VCS, for use with testing.
 
82
 
 
83
    It has revision ids that are a tuple with three strings.
 
84
    """
 
85
 
 
86
    def __init__(self):
 
87
        self.mapping_registry = DummyForeignVcsMappingRegistry()
 
88
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
 
89
                                       "Version 1")
 
90
 
 
91
    def show_foreign_revid(self, foreign_revid):
 
92
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
 
93
 
 
94
 
 
95
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
96
    """A Dummy VCS Branch."""
 
97
 
 
98
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
99
        self._format = _format
 
100
        self._base = a_bzrdir.transport.base
 
101
        self._ignore_fallbacks = False
 
102
        foreign.ForeignBranch.__init__(self, 
 
103
            DummyForeignVcsMapping(DummyForeignVcs()))
 
104
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
 
105
            *args, **kwargs)
 
106
 
 
107
 
 
108
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
109
                            foreign.InterToForeignBranch):
 
110
 
 
111
    @staticmethod
 
112
    def is_compatible(source, target):
 
113
        return isinstance(target, DummyForeignVcsBranch)
 
114
 
 
115
    def lossy_push(self, stop_revision=None):
 
116
        result = branch.BranchPushResult()
 
117
        result.source_branch = self.source
 
118
        result.target_branch = self.target
 
119
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
120
        self.source.lock_read()
 
121
        try:
 
122
            # This just handles simple cases, but that's good enough for tests
 
123
            my_history = self.target.revision_history()
 
124
            their_history = self.source.revision_history()
 
125
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
126
                raise errors.DivergedBranches(self.target, self.source)
 
127
            todo = their_history[len(my_history):]
 
128
            revidmap = {}
 
129
            for revid in todo:
 
130
                rev = self.source.repository.get_revision(revid)
 
131
                tree = self.source.repository.revision_tree(revid)
 
132
                def get_file_with_stat(file_id, path=None):
 
133
                    return (tree.get_file(file_id), None)
 
134
                tree.get_file_with_stat = get_file_with_stat
 
135
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
136
                    (str(rev.timestamp), str(rev.timezone), 
 
137
                        str(self.target.revno())))
 
138
                parent_revno, parent_revid= self.target.last_revision_info()
 
139
                if parent_revid == NULL_REVISION:
 
140
                    parent_revids = []
 
141
                else:
 
142
                    parent_revids = [parent_revid]
 
143
                builder = self.target.get_commit_builder(parent_revids, 
 
144
                        self.target.get_config(), rev.timestamp,
 
145
                        rev.timezone, rev.committer, rev.properties,
 
146
                        new_revid)
 
147
                try:
 
148
                    for path, ie in tree.inventory.iter_entries():
 
149
                        new_ie = ie.copy()
 
150
                        new_ie.revision = None
 
151
                        builder.record_entry_contents(new_ie, 
 
152
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
153
                            path, tree, 
 
154
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
155
                    builder.finish_inventory()
 
156
                except:
 
157
                    builder.abort()
 
158
                    raise
 
159
                revidmap[revid] = builder.commit(rev.message)
 
160
                self.target.set_last_revision_info(parent_revno+1, 
 
161
                    revidmap[revid])
 
162
                trace.mutter('lossily pushed revision %s -> %s', 
 
163
                    revid, revidmap[revid])
 
164
        finally:
 
165
            self.source.unlock()
 
166
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
167
        result.revidmap = revidmap
 
168
        return result
 
169
 
 
170
 
 
171
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
172
 
 
173
    def get_format_string(self):
 
174
        return "Branch for Testing"
 
175
 
 
176
    def __init__(self):
 
177
        super(DummyForeignVcsBranchFormat, self).__init__()
 
178
        self._matchingbzrdir = DummyForeignVcsDirFormat()
 
179
 
 
180
    def open(self, a_bzrdir, _found=False):
 
181
        if not _found:
 
182
            raise NotImplementedError
 
183
        try:
 
184
            transport = a_bzrdir.get_branch_transport(None)
 
185
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
186
                                                         lockdir.LockDir)
 
187
            return DummyForeignVcsBranch(_format=self,
 
188
                              _control_files=control_files,
 
189
                              a_bzrdir=a_bzrdir,
 
190
                              _repository=a_bzrdir.find_repository())
 
191
        except errors.NoSuchFile:
 
192
            raise errors.NotBranchError(path=transport.base)
 
193
 
 
194
 
 
195
class DummyForeignVcsDirFormat(BzrDirMetaFormat1):
 
196
    """BzrDirFormat for the dummy foreign VCS."""
 
197
 
 
198
    @classmethod
 
199
    def get_format_string(cls):
 
200
        return "A Dummy VCS Dir"
 
201
 
 
202
    @classmethod
 
203
    def get_format_description(cls):
 
204
        return "A Dummy VCS Dir"
 
205
 
 
206
    @classmethod
 
207
    def is_supported(cls):
 
208
        return True
 
209
 
 
210
    def get_branch_format(self):
 
211
        return DummyForeignVcsBranchFormat()
 
212
 
 
213
    @classmethod
 
214
    def probe_transport(klass, transport):
 
215
        """Return the .bzrdir style format present in a directory."""
 
216
        if not transport.has('.dummy'):
 
217
            raise errors.NotBranchError(path=transport.base)
 
218
        return klass()
 
219
 
 
220
    def initialize_on_transport(self, transport):
 
221
        """Initialize a new bzrdir in the base directory of a Transport."""
 
222
        # Since we don't have a .bzr directory, inherit the
 
223
        # mode from the root directory
 
224
        temp_control = lockable_files.LockableFiles(transport,
 
225
                            '', lockable_files.TransportLock)
 
226
        temp_control._transport.mkdir('.dummy',
 
227
                                      # FIXME: RBC 20060121 don't peek under
 
228
                                      # the covers
 
229
                                      mode=temp_control._dir_mode)
 
230
        del temp_control
 
231
        bzrdir_transport = transport.clone('.dummy')
 
232
        # NB: no need to escape relative paths that are url safe.
 
233
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
234
            self._lock_file_name, self._lock_class)
 
235
        control_files.create_lock()
 
236
        return self.open(transport, _found=True)
 
237
 
 
238
    def _open(self, transport):
 
239
        return DummyForeignVcsDir(transport, self)
 
240
 
 
241
 
 
242
class DummyForeignVcsDir(BzrDirMeta1):
 
243
 
 
244
    def __init__(self, _transport, _format):
 
245
        self._format = _format
 
246
        self.transport = _transport.clone('.dummy')
 
247
        self.root_transport = _transport
 
248
        self._mode_check_done = False
 
249
        self._control_files = lockable_files.LockableFiles(self.transport,
 
250
            "lock", lockable_files.TransportLock)
 
251
 
 
252
    def open_branch(self, ignore_fallbacks=True):
 
253
        return self._format.get_branch_format().open(self, _found=True)
 
254
 
 
255
    def cloning_metadir(self, stacked=False):
 
256
        """Produce a metadir suitable for cloning with."""
 
257
        return format_registry.make_bzrdir("default")
 
258
 
 
259
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
260
               recurse='down', possible_transports=None,
 
261
               accelerator_tree=None, hardlink=False, stacked=False,
 
262
               source_branch=None):
 
263
        # dirstate doesn't cope with accelerator_trees well 
 
264
        # that have a different control dir
 
265
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
266
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
267
                recurse=recurse, possible_transports=possible_transports, 
 
268
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
269
 
 
270
 
 
271
class ForeignVcsRegistryTests(TestCase):
 
272
    """Tests for the ForeignVcsRegistry class."""
 
273
 
 
274
    def test_parse_revision_id_no_dash(self):
 
275
        reg = foreign.ForeignVcsRegistry()
 
276
        self.assertRaises(errors.InvalidRevisionId,
 
277
                          reg.parse_revision_id, "invalid")
 
278
 
 
279
    def test_parse_revision_id_unknown_mapping(self):
 
280
        reg = foreign.ForeignVcsRegistry()
 
281
        self.assertRaises(errors.InvalidRevisionId,
 
282
                          reg.parse_revision_id, "unknown-foreignrevid")
 
283
 
 
284
    def test_parse_revision_id(self):
 
285
        reg = foreign.ForeignVcsRegistry()
 
286
        vcs = DummyForeignVcs()
 
287
        reg.register("dummy", vcs, "Dummy VCS")
 
288
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
289
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
290
 
 
291
 
 
292
class ForeignRevisionTests(TestCase):
 
293
    """Tests for the ForeignRevision class."""
 
294
 
 
295
    def test_create(self):
 
296
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
 
297
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
 
298
                                      mapp, "roundtripped-revid")
 
299
        self.assertEquals("", rev.inventory_sha1)
 
300
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
 
301
        self.assertEquals(mapp, rev.mapping)
 
302
 
 
303
 
 
304
class WorkingTreeFileUpdateTests(TestCaseWithTransport):
 
305
    """Tests for update_workingtree_fileids()."""
 
306
 
 
307
    def test_update_workingtree(self):
 
308
        wt = self.make_branch_and_tree('br1')
 
309
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
310
        wt.add('bla', 'bla-a')
 
311
        wt.commit('bla-a')
 
312
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
313
        target.unversion(['bla-a'])
 
314
        target.add('bla', 'bla-b')
 
315
        target.commit('bla-b')
 
316
        target_basis = target.basis_tree()
 
317
        target_basis.lock_read()
 
318
        self.addCleanup(target_basis.unlock)
 
319
        foreign.update_workingtree_fileids(wt, target_basis)
 
320
        wt.lock_read()
 
321
        try:
 
322
            self.assertEquals(["TREE_ROOT", "bla-b"], list(wt.inventory))
 
323
        finally:
 
324
            wt.unlock()
 
325
 
 
326
 
 
327
class DummyForeignVcsTests(TestCaseWithTransport):
 
328
    """Very basic test for DummyForeignVcs."""
 
329
 
 
330
    def setUp(self):
 
331
        BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
332
        branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
333
        self.addCleanup(self.unregister)
 
334
        super(DummyForeignVcsTests, self).setUp()
 
335
 
 
336
    def unregister(self):
 
337
        try:
 
338
            BzrDirFormat.unregister_control_format(DummyForeignVcsDirFormat)
 
339
        except ValueError:
 
340
            pass
 
341
        branch.InterBranch.unregister_optimiser(InterToDummyVcsBranch)
 
342
 
 
343
    def test_create(self):
 
344
        """Test we can create dummies."""
 
345
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
346
        dir = BzrDir.open("d")
 
347
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
348
        dir.open_repository()
 
349
        dir.open_branch()
 
350
        dir.open_workingtree()
 
351
 
 
352
    def test_sprout(self):
 
353
        """Test we can clone dummies and that the format is not preserved."""
 
354
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
355
        dir = BzrDir.open("d")
 
356
        newdir = dir.sprout("e")
 
357
        self.assertNotEquals("A Dummy VCS Dir", newdir._format.get_format_string())
 
358
 
 
359
    def test_lossy_push_empty(self):
 
360
        source_tree = self.make_branch_and_tree("source")
 
361
        target_tree = self.make_branch_and_tree("target", 
 
362
            format=DummyForeignVcsDirFormat())
 
363
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
364
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
 
365
        self.assertEquals(NULL_REVISION, pushresult.new_revid)
 
366
        self.assertEquals({}, pushresult.revidmap)
 
367
 
 
368
    def test_lossy_push_simple(self):
 
369
        source_tree = self.make_branch_and_tree("source")
 
370
        self.build_tree(['source/a', 'source/b'])
 
371
        source_tree.add(['a', 'b'])
 
372
        revid1 = source_tree.commit("msg")
 
373
        target_tree = self.make_branch_and_tree("target", 
 
374
            format=DummyForeignVcsDirFormat())
 
375
        target_tree.branch.lock_write()
 
376
        try:
 
377
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
378
        finally:
 
379
            target_tree.branch.unlock()
 
380
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
 
381
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
382
                           pushresult.revidmap)
 
383
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)