/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Robert Collins
  • Date: 2007-07-15 15:40:37 UTC
  • mto: (2592.3.33 repository)
  • mto: This revision was merged to the branch mainline in revision 2624.
  • Revision ID: robertc@robertcollins.net-20070715154037-3ar8g89decddc9su
Make GraphIndex accept nodes as key, value, references, so that the method
signature is closer to what a simple key->value index delivers. Also
change the behaviour when the reference list count is zero to accept
key, value as nodes, and emit key, value to make it identical in that case
to a simple key->value index. This may not be a good idea, but for now it
seems ok.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
"""Tests for foreign VCS utility code."""
19
 
 
20
 
 
21
 
from bzrlib import (
22
 
    branch,
23
 
    bzrdir,
24
 
    errors,
25
 
    foreign,
26
 
    lockable_files,
27
 
    lockdir,
28
 
    revision,
29
 
    tests,
30
 
    trace,
31
 
    )
32
 
 
33
 
# This is the dummy foreign revision control system, used 
34
 
# mainly here in the testsuite to test the foreign VCS infrastructure.
35
 
# It is basically standard Bazaar with some minor modifications to 
36
 
# make it "foreign". 
37
 
38
 
# It has the following differences to "regular" Bazaar:
39
 
# - The control directory is named ".dummy", not ".bzr".
40
 
# - The revision ids are tuples, not strings.
41
 
# - Doesn't support more than one parent natively
42
 
 
43
 
 
44
 
class DummyForeignVcsMapping(foreign.VcsMapping):
45
 
    """A simple mapping for the dummy Foreign VCS, for use with testing."""
46
 
 
47
 
    def __eq__(self, other):
48
 
        return type(self) == type(other)
49
 
 
50
 
    def revision_id_bzr_to_foreign(self, bzr_revid):
51
 
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self
52
 
 
53
 
    def revision_id_foreign_to_bzr(self, foreign_revid):
54
 
        return "dummy-v1:%s-%s-%s" % foreign_revid
55
 
 
56
 
 
57
 
class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):
58
 
 
59
 
    def revision_id_bzr_to_foreign(self, revid):
60
 
        if not revid.startswith("dummy-"):
61
 
            raise errors.InvalidRevisionId(revid, None)
62
 
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
63
 
        mapping = self.get(mapping_version)
64
 
        return mapping.revision_id_bzr_to_foreign(revid)
65
 
 
66
 
 
67
 
class DummyForeignVcs(foreign.ForeignVcs):
68
 
    """A dummy Foreign VCS, for use with testing.
69
 
 
70
 
    It has revision ids that are a tuple with three strings.
71
 
    """
72
 
 
73
 
    def __init__(self):
74
 
        self.mapping_registry = DummyForeignVcsMappingRegistry()
75
 
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
76
 
                                       "Version 1")
77
 
        self.abbreviation = "dummy"
78
 
 
79
 
    def show_foreign_revid(self, foreign_revid):
80
 
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
81
 
 
82
 
    def serialize_foreign_revid(self, foreign_revid):
83
 
        return "%s|%s|%s" % foreign_revid
84
 
 
85
 
 
86
 
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
87
 
    """A Dummy VCS Branch."""
88
 
 
89
 
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
90
 
        self._format = _format
91
 
        self._base = a_bzrdir.transport.base
92
 
        self._ignore_fallbacks = False
93
 
        self.bzrdir = a_bzrdir
94
 
        foreign.ForeignBranch.__init__(self, 
95
 
            DummyForeignVcsMapping(DummyForeignVcs()))
96
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
97
 
            *args, **kwargs)
98
 
 
99
 
 
100
 
class InterToDummyVcsBranch(branch.GenericInterBranch,
101
 
                            foreign.InterToForeignBranch):
102
 
 
103
 
    @staticmethod
104
 
    def is_compatible(source, target):
105
 
        return isinstance(target, DummyForeignVcsBranch)
106
 
 
107
 
    def push(self, overwrite=False, stop_revision=None):
108
 
        raise errors.NoRoundtrippingSupport(self.source, self.target)
109
 
 
110
 
    def lossy_push(self, stop_revision=None):
111
 
        result = branch.BranchPushResult()
112
 
        result.source_branch = self.source
113
 
        result.target_branch = self.target
114
 
        result.old_revno, result.old_revid = self.target.last_revision_info()
115
 
        self.source.lock_read()
116
 
        try:
117
 
            # This just handles simple cases, but that's good enough for tests
118
 
            my_history = self.target.revision_history()
119
 
            their_history = self.source.revision_history()
120
 
            if their_history[:min(len(my_history), len(their_history))] != my_history:
121
 
                raise errors.DivergedBranches(self.target, self.source)
122
 
            todo = their_history[len(my_history):]
123
 
            revidmap = {}
124
 
            for revid in todo:
125
 
                rev = self.source.repository.get_revision(revid)
126
 
                tree = self.source.repository.revision_tree(revid)
127
 
                def get_file_with_stat(file_id, path=None):
128
 
                    return (tree.get_file(file_id), None)
129
 
                tree.get_file_with_stat = get_file_with_stat
130
 
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
131
 
                    (str(rev.timestamp), str(rev.timezone), 
132
 
                        str(self.target.revno())))
133
 
                parent_revno, parent_revid= self.target.last_revision_info()
134
 
                if parent_revid == revision.NULL_REVISION:
135
 
                    parent_revids = []
136
 
                else:
137
 
                    parent_revids = [parent_revid]
138
 
                builder = self.target.get_commit_builder(parent_revids, 
139
 
                        self.target.get_config(), rev.timestamp,
140
 
                        rev.timezone, rev.committer, rev.properties,
141
 
                        new_revid)
142
 
                try:
143
 
                    for path, ie in tree.inventory.iter_entries():
144
 
                        new_ie = ie.copy()
145
 
                        new_ie.revision = None
146
 
                        builder.record_entry_contents(new_ie, 
147
 
                            [self.target.repository.revision_tree(parent_revid).inventory],
148
 
                            path, tree, 
149
 
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
150
 
                    builder.finish_inventory()
151
 
                except:
152
 
                    builder.abort()
153
 
                    raise
154
 
                revidmap[revid] = builder.commit(rev.message)
155
 
                self.target.set_last_revision_info(parent_revno+1, 
156
 
                    revidmap[revid])
157
 
                trace.mutter('lossily pushed revision %s -> %s', 
158
 
                    revid, revidmap[revid])
159
 
        finally:
160
 
            self.source.unlock()
161
 
        result.new_revno, result.new_revid = self.target.last_revision_info()
162
 
        result.revidmap = revidmap
163
 
        return result
164
 
 
165
 
 
166
 
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
167
 
 
168
 
    def get_format_string(self):
169
 
        return "Branch for Testing"
170
 
 
171
 
    def __init__(self):
172
 
        super(DummyForeignVcsBranchFormat, self).__init__()
173
 
        self._matchingbzrdir = DummyForeignVcsDirFormat()
174
 
 
175
 
    def open(self, a_bzrdir, name=None, _found=False):
176
 
        if not _found:
177
 
            raise NotImplementedError
178
 
        try:
179
 
            transport = a_bzrdir.get_branch_transport(None, name=name)
180
 
            control_files = lockable_files.LockableFiles(transport, 'lock',
181
 
                                                         lockdir.LockDir)
182
 
            return DummyForeignVcsBranch(_format=self,
183
 
                              _control_files=control_files,
184
 
                              a_bzrdir=a_bzrdir,
185
 
                              _repository=a_bzrdir.find_repository())
186
 
        except errors.NoSuchFile:
187
 
            raise errors.NotBranchError(path=transport.base)
188
 
 
189
 
 
190
 
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
191
 
    """BzrDirFormat for the dummy foreign VCS."""
192
 
 
193
 
    @classmethod
194
 
    def get_format_string(cls):
195
 
        return "A Dummy VCS Dir"
196
 
 
197
 
    @classmethod
198
 
    def get_format_description(cls):
199
 
        return "A Dummy VCS Dir"
200
 
 
201
 
    @classmethod
202
 
    def is_supported(cls):
203
 
        return True
204
 
 
205
 
    def get_branch_format(self):
206
 
        return DummyForeignVcsBranchFormat()
207
 
 
208
 
    @classmethod
209
 
    def probe_transport(klass, transport):
210
 
        """Return the .bzrdir style format present in a directory."""
211
 
        if not transport.has('.dummy'):
212
 
            raise errors.NotBranchError(path=transport.base)
213
 
        return klass()
214
 
 
215
 
    def initialize_on_transport(self, transport):
216
 
        """Initialize a new bzrdir in the base directory of a Transport."""
217
 
        # Since we don't have a .bzr directory, inherit the
218
 
        # mode from the root directory
219
 
        temp_control = lockable_files.LockableFiles(transport,
220
 
                            '', lockable_files.TransportLock)
221
 
        temp_control._transport.mkdir('.dummy',
222
 
                                      # FIXME: RBC 20060121 don't peek under
223
 
                                      # the covers
224
 
                                      mode=temp_control._dir_mode)
225
 
        del temp_control
226
 
        bzrdir_transport = transport.clone('.dummy')
227
 
        # NB: no need to escape relative paths that are url safe.
228
 
        control_files = lockable_files.LockableFiles(bzrdir_transport,
229
 
            self._lock_file_name, self._lock_class)
230
 
        control_files.create_lock()
231
 
        return self.open(transport, _found=True)
232
 
 
233
 
    def _open(self, transport):
234
 
        return DummyForeignVcsDir(transport, self)
235
 
 
236
 
 
237
 
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
238
 
 
239
 
    def __init__(self, _transport, _format):
240
 
        self._format = _format
241
 
        self.transport = _transport.clone('.dummy')
242
 
        self.root_transport = _transport
243
 
        self._mode_check_done = False
244
 
        self._control_files = lockable_files.LockableFiles(self.transport,
245
 
            "lock", lockable_files.TransportLock)
246
 
 
247
 
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
248
 
        if name is not None:
249
 
            raise errors.NoColocatedBranchSupport(self)
250
 
        return self._format.get_branch_format().open(self, _found=True)
251
 
 
252
 
    def cloning_metadir(self, stacked=False):
253
 
        """Produce a metadir suitable for cloning with."""
254
 
        return bzrdir.format_registry.make_bzrdir("default")
255
 
 
256
 
    def sprout(self, url, revision_id=None, force_new_repo=False,
257
 
               recurse='down', possible_transports=None,
258
 
               accelerator_tree=None, hardlink=False, stacked=False,
259
 
               source_branch=None):
260
 
        # dirstate doesn't cope with accelerator_trees well 
261
 
        # that have a different control dir
262
 
        return super(DummyForeignVcsDir, self).sprout(url=url, 
263
 
                revision_id=revision_id, force_new_repo=force_new_repo, 
264
 
                recurse=recurse, possible_transports=possible_transports, 
265
 
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
266
 
 
267
 
 
268
 
def register_dummy_foreign_for_test(testcase):
269
 
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
270
 
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
271
 
                        DummyForeignVcsDirFormat)
272
 
    # We need to register the optimiser to make the dummy appears really
273
 
    # different from a regular bzr repository.
274
 
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
275
 
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
276
 
                        InterToDummyVcsBranch)
277
 
 
278
 
 
279
 
class ForeignVcsRegistryTests(tests.TestCase):
280
 
    """Tests for the ForeignVcsRegistry class."""
281
 
 
282
 
    def test_parse_revision_id_no_dash(self):
283
 
        reg = foreign.ForeignVcsRegistry()
284
 
        self.assertRaises(errors.InvalidRevisionId,
285
 
                          reg.parse_revision_id, "invalid")
286
 
 
287
 
    def test_parse_revision_id_unknown_mapping(self):
288
 
        reg = foreign.ForeignVcsRegistry()
289
 
        self.assertRaises(errors.InvalidRevisionId,
290
 
                          reg.parse_revision_id, "unknown-foreignrevid")
291
 
 
292
 
    def test_parse_revision_id(self):
293
 
        reg = foreign.ForeignVcsRegistry()
294
 
        vcs = DummyForeignVcs()
295
 
        reg.register("dummy", vcs, "Dummy VCS")
296
 
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
297
 
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
298
 
 
299
 
 
300
 
class ForeignRevisionTests(tests.TestCase):
301
 
    """Tests for the ForeignRevision class."""
302
 
 
303
 
    def test_create(self):
304
 
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
305
 
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
306
 
                                      mapp, "roundtripped-revid")
307
 
        self.assertEquals("", rev.inventory_sha1)
308
 
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
309
 
        self.assertEquals(mapp, rev.mapping)
310
 
 
311
 
 
312
 
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
313
 
    """Tests for update_workingtree_fileids()."""
314
 
 
315
 
    def test_update_workingtree(self):
316
 
        wt = self.make_branch_and_tree('br1')
317
 
        self.build_tree_contents([('br1/bla', 'original contents\n')])
318
 
        wt.add('bla', 'bla-a')
319
 
        wt.commit('bla-a')
320
 
        root_id = wt.get_root_id()
321
 
        target = wt.bzrdir.sprout('br2').open_workingtree()
322
 
        target.unversion(['bla-a'])
323
 
        target.add('bla', 'bla-b')
324
 
        target.commit('bla-b')
325
 
        target_basis = target.basis_tree()
326
 
        target_basis.lock_read()
327
 
        self.addCleanup(target_basis.unlock)
328
 
        foreign.update_workingtree_fileids(wt, target_basis)
329
 
        wt.lock_read()
330
 
        try:
331
 
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
332
 
        finally:
333
 
            wt.unlock()
334
 
 
335
 
 
336
 
class DummyForeignVcsTests(tests.TestCaseWithTransport):
337
 
    """Very basic test for DummyForeignVcs."""
338
 
 
339
 
    def setUp(self):
340
 
        super(DummyForeignVcsTests, self).setUp()
341
 
        register_dummy_foreign_for_test(self)
342
 
 
343
 
    def test_create(self):
344
 
        """Test we can create dummies."""
345
 
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
346
 
        dir = bzrdir.BzrDir.open("d")
347
 
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
348
 
        dir.open_repository()
349
 
        dir.open_branch()
350
 
        dir.open_workingtree()
351
 
 
352
 
    def test_sprout(self):
353
 
        """Test we can clone dummies and that the format is not preserved."""
354
 
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
355
 
        dir = bzrdir.BzrDir.open("d")
356
 
        newdir = dir.sprout("e")
357
 
        self.assertNotEquals("A Dummy VCS Dir",
358
 
                             newdir._format.get_format_string())
359
 
 
360
 
    def test_push_not_supported(self):
361
 
        source_tree = self.make_branch_and_tree("source")
362
 
        target_tree = self.make_branch_and_tree("target", 
363
 
            format=DummyForeignVcsDirFormat())
364
 
        self.assertRaises(errors.NoRoundtrippingSupport, 
365
 
            source_tree.branch.push, target_tree.branch)
366
 
 
367
 
    def test_lossy_push_empty(self):
368
 
        source_tree = self.make_branch_and_tree("source")
369
 
        target_tree = self.make_branch_and_tree("target", 
370
 
            format=DummyForeignVcsDirFormat())
371
 
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
372
 
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
373
 
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
374
 
        self.assertEquals({}, pushresult.revidmap)
375
 
 
376
 
    def test_lossy_push_simple(self):
377
 
        source_tree = self.make_branch_and_tree("source")
378
 
        self.build_tree(['source/a', 'source/b'])
379
 
        source_tree.add(['a', 'b'])
380
 
        revid1 = source_tree.commit("msg")
381
 
        target_tree = self.make_branch_and_tree("target", 
382
 
            format=DummyForeignVcsDirFormat())
383
 
        target_tree.branch.lock_write()
384
 
        try:
385
 
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
386
 
        finally:
387
 
            target_tree.branch.unlock()
388
 
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
389
 
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
390
 
                           pushresult.revidmap)
391
 
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)