/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: Aaron Bentley
  • Date: 2006-04-07 22:46:52 UTC
  • mfrom: (1645 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1727.
  • Revision ID: aaron.bentley@utoronto.ca-20060407224652-4925bc3735b926f8
Merged latest bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2005 Canonical Ltd
 
1
# (C) 2005, 2006 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
17
19
import os
 
20
import sys
18
21
 
 
22
import bzrlib.branch
 
23
import bzrlib.bzrdir as bzrdir
19
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
 
from bzrlib.clone import copy_branch
21
25
from bzrlib.commit import commit
22
26
import bzrlib.errors as errors
23
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
27
from bzrlib.errors import (FileExists,
 
28
                           NoSuchRevision,
 
29
                           NoSuchFile,
 
30
                           UninitializableFormat,
 
31
                           NotBranchError,
 
32
                           )
24
33
import bzrlib.gpg
25
 
from bzrlib.tests import TestCase, TestCaseInTempDir
26
 
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
34
from bzrlib.osutils import getcwd
 
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
27
36
from bzrlib.trace import mutter
28
37
import bzrlib.transactions as transactions
29
 
from bzrlib.revision import NULL_REVISION
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
from bzrlib.upgrade import upgrade
 
42
from bzrlib.workingtree import WorkingTree
 
43
 
30
44
 
31
45
# TODO: Make a branch using basis branch, and check that it 
32
46
# doesn't request any files that could have been avoided, by 
33
47
# hooking into the Transport.
34
48
 
35
 
class TestBranch(TestCaseInTempDir):
 
49
 
 
50
class TestCaseWithBranch(TestCaseWithTransport):
 
51
 
 
52
    def setUp(self):
 
53
        super(TestCaseWithBranch, self).setUp()
 
54
        self.branch = None
 
55
 
 
56
    def get_branch(self):
 
57
        if self.branch is None:
 
58
            self.branch = self.make_branch(None)
 
59
        return self.branch
 
60
 
 
61
    def make_branch(self, relpath):
 
62
        repo = self.make_repository(relpath)
 
63
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
 
64
        # Skipped is the wrong exception to raise.
 
65
        try:
 
66
            return self.branch_format.initialize(repo.bzrdir)
 
67
        except errors.UninitializableFormat:
 
68
            raise TestSkipped('Uninitializable branch format')
 
69
 
 
70
    def make_repository(self, relpath, shared=False):
 
71
        try:
 
72
            url = self.get_url(relpath)
 
73
            segments = url.split('/')
 
74
            if segments and segments[-1] not in ('', '.'):
 
75
                parent = '/'.join(segments[:-1])
 
76
                t = get_transport(parent)
 
77
                try:
 
78
                    t.mkdir(segments[-1])
 
79
                except FileExists:
 
80
                    pass
 
81
            made_control = self.bzrdir_format.initialize(url)
 
82
            return made_control.create_repository(shared=shared)
 
83
        except UninitializableFormat:
 
84
            raise TestSkipped("Format %s is not initializable.")
 
85
 
 
86
 
 
87
class TestBranch(TestCaseWithBranch):
36
88
 
37
89
    def test_append_revisions(self):
38
90
        """Test appending more than one revision"""
39
 
        br = Branch.initialize(u".")
 
91
        br = self.get_branch()
40
92
        br.append_revision("rev1")
41
93
        self.assertEquals(br.revision_history(), ["rev1",])
42
94
        br.append_revision("rev2", "rev3")
44
96
 
45
97
    def test_fetch_revisions(self):
46
98
        """Test fetch-revision operation."""
47
 
        from bzrlib.fetch import Fetcher
48
 
        os.mkdir('b1')
49
 
        os.mkdir('b2')
50
 
        b1 = Branch.initialize('b1')
51
 
        b2 = Branch.initialize('b2')
52
 
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
 
        b1.working_tree().add(['foo'], ['foo-id'])
54
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
99
        get_transport(self.get_url()).mkdir('b1')
 
100
        get_transport(self.get_url()).mkdir('b2')
 
101
        wt = self.make_branch_and_tree('b1')
 
102
        b1 = wt.branch
 
103
        b2 = self.make_branch('b2')
 
104
        file('b1/foo', 'w').write('hello')
 
105
        wt.add(['foo'], ['foo-id'])
 
106
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
55
107
 
56
108
        mutter('start fetch')
57
 
        f = Fetcher(from_branch=b1, to_branch=b2)
58
 
        eq = self.assertEquals
59
 
        eq(f.count_copied, 1)
60
 
        eq(f.last_revision, 'revision-1')
61
 
 
62
 
        rev = b2.get_revision('revision-1')
63
 
        tree = b2.revision_tree('revision-1')
64
 
        eq(tree.get_file_text('foo-id'), 'hello')
65
 
 
66
 
    def test_revision_tree(self):
67
 
        b1 = Branch.initialize(u'.')
68
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
69
 
        tree = b1.revision_tree('revision-1')
70
 
        tree = b1.revision_tree(None)
71
 
        self.assertEqual(len(tree.list_files()), 0)
72
 
        tree = b1.revision_tree(NULL_REVISION)
73
 
        self.assertEqual(len(tree.list_files()), 0)
74
 
 
75
 
    def get_unbalanced_branch_pair(self):
 
109
        self.assertEqual((1, []), b2.fetch(b1))
 
110
 
 
111
        rev = b2.repository.get_revision('revision-1')
 
112
        tree = b2.repository.revision_tree('revision-1')
 
113
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
 
114
 
 
115
    def get_unbalanced_tree_pair(self):
76
116
        """Return two branches, a and b, with one file in a."""
77
 
        os.mkdir('a')
78
 
        br_a = Branch.initialize("a")
 
117
        get_transport(self.get_url()).mkdir('a')
 
118
        tree_a = self.make_branch_and_tree('a')
79
119
        file('a/b', 'wb').write('b')
80
 
        br_a.working_tree().add('b')
81
 
        commit(br_a, "silly commit", rev_id='A')
82
 
        os.mkdir('b')
83
 
        br_b = Branch.initialize("b")
84
 
        return br_a, br_b
 
120
        tree_a.add('b')
 
121
        tree_a.commit("silly commit", rev_id='A')
 
122
 
 
123
        get_transport(self.get_url()).mkdir('b')
 
124
        tree_b = self.make_branch_and_tree('b')
 
125
        return tree_a, tree_b
85
126
 
86
127
    def get_balanced_branch_pair(self):
87
128
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
 
        br_a, br_b = self.get_unbalanced_branch_pair()
89
 
        br_a.push_stores(br_b)
90
 
        return br_a, br_b
91
 
 
92
 
    def test_push_stores(self):
93
 
        """Copy the stores from one branch to another"""
94
 
        br_a, br_b = self.get_unbalanced_branch_pair()
95
 
        # ensure the revision is missing.
96
 
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
97
 
                          br_a.revision_history()[0])
98
 
        br_a.push_stores(br_b)
99
 
        # check that b now has all the data from a's first commit.
100
 
        rev = br_b.get_revision(br_a.revision_history()[0])
101
 
        tree = br_b.revision_tree(br_a.revision_history()[0])
102
 
        for file_id in tree:
103
 
            if tree.inventory[file_id].kind == "file":
104
 
                tree.get_file(file_id).read()
105
 
        return br_a, br_b
106
 
 
107
 
    def test_copy_branch(self):
108
 
        """Copy the stores from one branch to another"""
109
 
        br_a, br_b = self.get_balanced_branch_pair()
110
 
        commit(br_b, "silly commit")
 
129
        tree_a, tree_b = self.get_unbalanced_tree_pair()
 
130
        tree_b.branch.repository.fetch(tree_a.branch.repository)
 
131
        return tree_a, tree_b
 
132
 
 
133
    def test_clone_branch(self):
 
134
        """Copy the stores from one branch to another"""
 
135
        tree_a, tree_b = self.get_balanced_branch_pair()
 
136
        tree_b.commit("silly commit")
111
137
        os.mkdir('c')
112
 
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
 
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
138
        # this fails to test that the history from a was not used.
 
139
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
 
140
        self.assertEqual(tree_a.branch.revision_history(),
 
141
                         dir_c.open_branch().revision_history())
114
142
 
115
 
    def test_copy_partial(self):
 
143
    def test_clone_partial(self):
116
144
        """Copy only part of the history of a branch."""
117
 
        self.build_tree(['a/', 'a/one'])
118
 
        br_a = Branch.initialize('a')
119
 
        br_a.working_tree().add(['one'])
120
 
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
121
 
        self.build_tree(['a/two'])
122
 
        br_a.working_tree().add(['two'])
123
 
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
124
 
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
 
        self.assertEqual(br_b.last_revision(), 'u@d-1')
126
 
        self.assertTrue(os.path.exists('b/one'))
127
 
        self.assertFalse(os.path.exists('b/two'))
 
145
        # TODO: RBC 20060208 test with a revision not on revision-history.
 
146
        #       what should that behaviour be ? Emailed the list.
 
147
        wt_a = self.make_branch_and_tree('a')
 
148
        self.build_tree(['a/one'])
 
149
        wt_a.add(['one'])
 
150
        wt_a.commit('commit one', rev_id='1')
 
151
        self.build_tree(['a/two'])
 
152
        wt_a.add(['two'])
 
153
        wt_a.commit('commit two', rev_id='2')
 
154
        repo_b = self.make_repository('b')
 
155
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
156
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
157
        self.assertEqual(br_b.last_revision(), '1')
 
158
 
 
159
    def test_sprout_partial(self):
 
160
        # test sprouting with a prefix of the revision-history.
 
161
        # also needs not-on-revision-history behaviour defined.
 
162
        wt_a = self.make_branch_and_tree('a')
 
163
        self.build_tree(['a/one'])
 
164
        wt_a.add(['one'])
 
165
        wt_a.commit('commit one', rev_id='1')
 
166
        self.build_tree(['a/two'])
 
167
        wt_a.add(['two'])
 
168
        wt_a.commit('commit two', rev_id='2')
 
169
        repo_b = self.make_repository('b')
 
170
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
171
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
172
        self.assertEqual(br_b.last_revision(), '1')
 
173
 
 
174
    def test_clone_branch_nickname(self):
 
175
        # test the nick name is preserved always
 
176
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
177
 
 
178
    def test_clone_branch_parent(self):
 
179
        # test the parent is preserved always
 
180
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
181
        
 
182
    def test_sprout_branch_nickname(self):
 
183
        # test the nick name is reset always
 
184
        raise TestSkipped('XXX branch sprouting is not yet tested..')
 
185
 
 
186
    def test_sprout_branch_parent(self):
 
187
        source = self.make_branch('source')
 
188
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
 
189
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
128
190
        
129
191
    def test_record_initial_ghost_merge(self):
130
192
        """A pending merge with no revision present is still a merge."""
131
 
        branch = Branch.initialize(u'.')
132
 
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
133
 
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
134
 
        rev = branch.get_revision(branch.last_revision())
 
193
        wt = self.make_branch_and_tree('.')
 
194
        branch = wt.branch
 
195
        wt.add_pending_merge('non:existent@rev--ision--0--2')
 
196
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
 
197
        rev = branch.repository.get_revision(branch.last_revision())
135
198
        self.assertEqual(len(rev.parent_ids), 1)
136
199
        # parent_sha1s is not populated now, WTF. rbc 20051003
137
200
        self.assertEqual(len(rev.parent_sha1s), 0)
138
201
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
139
202
 
140
203
    def test_bad_revision(self):
141
 
        branch = Branch.initialize(u'.')
142
 
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
204
        self.assertRaises(errors.InvalidRevisionId,
 
205
                          self.get_branch().repository.get_revision,
 
206
                          None)
143
207
 
144
208
# TODO 20051003 RBC:
145
209
# compare the gpg-to-sign info for a commit with a ghost and 
148
212
        
149
213
    def test_pending_merges(self):
150
214
        """Tracking pending-merged revisions."""
151
 
        b = Branch.initialize(u'.')
152
 
        wt = b.working_tree()
 
215
        wt = self.make_branch_and_tree('.')
 
216
        b = wt.branch
153
217
        self.assertEquals(wt.pending_merges(), [])
154
218
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
155
219
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
159
223
        self.assertEquals(wt.pending_merges(),
160
224
                          ['foo@azkhazan-123123-abcabc',
161
225
                           'wibble@fofof--20050401--1928390812'])
162
 
        b.working_tree().commit("commit from base with two merges")
163
 
        rev = b.get_revision(b.revision_history()[0])
 
226
        wt.commit("commit from base with two merges")
 
227
        rev = b.repository.get_revision(b.revision_history()[0])
164
228
        self.assertEquals(len(rev.parent_ids), 2)
165
229
        self.assertEquals(rev.parent_ids[0],
166
230
                          'foo@azkhazan-123123-abcabc')
170
234
        self.assertEquals(wt.pending_merges(), [])
171
235
 
172
236
    def test_sign_existing_revision(self):
173
 
        branch = Branch.initialize(u'.')
174
 
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
237
        wt = self.make_branch_and_tree('.')
 
238
        branch = wt.branch
 
239
        wt.commit("base", allow_pointless=True, rev_id='A')
175
240
        from bzrlib.testament import Testament
176
 
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
 
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
 
                         branch.revision_store.get('A', 'sig').read())
 
241
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
 
242
        branch.repository.sign_revision('A', strategy)
 
243
        self.assertEqual(Testament.from_revision(branch.repository, 
 
244
                         'A').as_short_text(),
 
245
                         branch.repository.get_signature_text('A'))
179
246
 
180
247
    def test_store_signature(self):
181
 
        branch = Branch.initialize(u'.')
182
 
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
183
 
                                        'FOO', 'A')
184
 
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
248
        wt = self.make_branch_and_tree('.')
 
249
        branch = wt.branch
 
250
        branch.repository.store_revision_signature(
 
251
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
252
        self.assertRaises(errors.NoSuchRevision,
 
253
                          branch.repository.has_signature_for_revision_id,
 
254
                          'A')
 
255
        wt.commit("base", allow_pointless=True, rev_id='A')
 
256
        self.assertEqual('FOO', 
 
257
                         branch.repository.get_signature_text('A'))
185
258
 
186
 
    def test__relcontrolfilename(self):
187
 
        branch = Branch.initialize(u'.')
188
 
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
189
 
        
190
 
    def test__relcontrolfilename_empty(self):
191
 
        branch = Branch.initialize(u'.')
192
 
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
259
    def test_branch_keeps_signatures(self):
 
260
        wt = self.make_branch_and_tree('source')
 
261
        wt.commit('A', allow_pointless=True, rev_id='A')
 
262
        wt.branch.repository.sign_revision('A',
 
263
            bzrlib.gpg.LoopbackGPGStrategy(None))
 
264
        #FIXME: clone should work to urls,
 
265
        # wt.clone should work to disks.
 
266
        self.build_tree(['target/'])
 
267
        d2 = wt.bzrdir.clone('target')
 
268
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
 
269
                         d2.open_repository().get_signature_text('A'))
193
270
 
194
271
    def test_nicks(self):
195
272
        """Branch nicknames"""
196
 
        os.mkdir('bzr.dev')
197
 
        branch = Branch.initialize('bzr.dev')
 
273
        t = get_transport(self.get_url())
 
274
        t.mkdir('bzr.dev')
 
275
        branch = self.make_branch('bzr.dev')
198
276
        self.assertEqual(branch.nick, 'bzr.dev')
199
 
        os.rename('bzr.dev', 'bzr.ab')
200
 
        branch = Branch.open('bzr.ab')
 
277
        t.move('bzr.dev', 'bzr.ab')
 
278
        branch = Branch.open(self.get_url('bzr.ab'))
201
279
        self.assertEqual(branch.nick, 'bzr.ab')
202
280
        branch.nick = "Aaron's branch"
203
281
        branch.nick = "Aaron's branch"
204
 
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
282
        self.failUnless(
 
283
            t.has(
 
284
                t.relpath(
 
285
                    branch.control_files.controlfilename("branch.conf")
 
286
                    )
 
287
                )
 
288
            )
205
289
        self.assertEqual(branch.nick, "Aaron's branch")
206
 
        os.rename('bzr.ab', 'integration')
207
 
        branch = Branch.open('integration')
 
290
        t.move('bzr.ab', 'integration')
 
291
        branch = Branch.open(self.get_url('integration'))
208
292
        self.assertEqual(branch.nick, "Aaron's branch")
209
293
        branch.nick = u"\u1234"
210
294
        self.assertEqual(branch.nick, u"\u1234")
211
295
 
212
296
    def test_commit_nicks(self):
213
297
        """Nicknames are committed to the revision"""
214
 
        os.mkdir('bzr.dev')
215
 
        branch = Branch.initialize('bzr.dev')
 
298
        get_transport(self.get_url()).mkdir('bzr.dev')
 
299
        wt = self.make_branch_and_tree('bzr.dev')
 
300
        branch = wt.branch
216
301
        branch.nick = "My happy branch"
217
 
        branch.working_tree().commit('My commit respect da nick.')
218
 
        committed = branch.get_revision(branch.last_revision())
 
302
        wt.commit('My commit respect da nick.')
 
303
        committed = branch.repository.get_revision(branch.last_revision())
219
304
        self.assertEqual(committed.properties["branch-nick"], 
220
305
                         "My happy branch")
221
306
 
222
 
 
223
 
class TestRemote(TestCaseWithWebserver):
 
307
    def test_create_open_branch_uses_repository(self):
 
308
        try:
 
309
            repo = self.make_repository('.', shared=True)
 
310
        except errors.IncompatibleFormat:
 
311
            return
 
312
        repo.bzrdir.root_transport.mkdir('child')
 
313
        child_dir = self.bzrdir_format.initialize('child')
 
314
        try:
 
315
            child_branch = self.branch_format.initialize(child_dir)
 
316
        except errors.UninitializableFormat:
 
317
            # branch references are not default init'able.
 
318
            return
 
319
        self.assertEqual(repo.bzrdir.root_transport.base,
 
320
                         child_branch.repository.bzrdir.root_transport.base)
 
321
        child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
 
322
        self.assertEqual(repo.bzrdir.root_transport.base,
 
323
                         child_branch.repository.bzrdir.root_transport.base)
 
324
 
 
325
 
 
326
class ChrootedTests(TestCaseWithBranch):
 
327
    """A support class that provides readonly urls outside the local namespace.
 
328
 
 
329
    This is done by checking if self.transport_server is a MemoryServer. if it
 
330
    is then we are chrooted already, if it is not then an HttpServer is used
 
331
    for readonly urls.
 
332
    """
 
333
 
 
334
    def setUp(self):
 
335
        super(ChrootedTests, self).setUp()
 
336
        if not self.transport_server == MemoryServer:
 
337
            self.transport_readonly_server = HttpServer
224
338
 
225
339
    def test_open_containing(self):
226
340
        self.assertRaises(NotBranchError, Branch.open_containing,
227
 
                          self.get_remote_url(''))
 
341
                          self.get_readonly_url(''))
228
342
        self.assertRaises(NotBranchError, Branch.open_containing,
229
 
                          self.get_remote_url('g/p/q'))
230
 
        b = Branch.initialize(u'.')
231
 
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
343
                          self.get_readonly_url('g/p/q'))
 
344
        branch = self.make_branch('.')
 
345
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
232
346
        self.assertEqual('', relpath)
233
 
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
347
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
234
348
        self.assertEqual('g/p/q', relpath)
235
349
        
236
350
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
315
429
        self.assertEqual(['lw', 'ul'], branch._calls)
316
430
 
317
431
 
318
 
class TestBranchTransaction(TestCaseInTempDir):
 
432
class TestBranchTransaction(TestCaseWithBranch):
319
433
 
320
434
    def setUp(self):
321
435
        super(TestBranchTransaction, self).setUp()
322
 
        self.branch = Branch.initialize(u'.')
 
436
        self.branch = None
323
437
        
324
438
    def test_default_get_transaction(self):
325
439
        """branch.get_transaction on a new branch should give a PassThrough."""
326
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
440
        self.failUnless(isinstance(self.get_branch().get_transaction(),
327
441
                                   transactions.PassThroughTransaction))
328
442
 
329
443
    def test__set_new_transaction(self):
330
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
444
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
331
445
 
332
446
    def test__set_over_existing_transaction_raises(self):
333
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
447
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
334
448
        self.assertRaises(errors.LockError,
335
 
                          self.branch._set_transaction,
 
449
                          self.get_branch()._set_transaction,
336
450
                          transactions.ReadOnlyTransaction())
337
451
 
338
452
    def test_finish_no_transaction_raises(self):
339
 
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
453
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
340
454
 
341
455
    def test_finish_readonly_transaction_works(self):
342
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
343
 
        self.branch._finish_transaction()
344
 
        self.assertEqual(None, self.branch._transaction)
 
456
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
457
        self.get_branch()._finish_transaction()
 
458
        self.assertEqual(None, self.get_branch().control_files._transaction)
345
459
 
346
460
    def test_unlock_calls_finish(self):
347
 
        self.branch.lock_read()
 
461
        self.get_branch().lock_read()
348
462
        transaction = InstrumentedTransaction()
349
 
        self.branch._transaction = transaction
350
 
        self.branch.unlock()
 
463
        self.get_branch().control_files._transaction = transaction
 
464
        self.get_branch().unlock()
351
465
        self.assertEqual(['finish'], transaction.calls)
352
466
 
353
467
    def test_lock_read_acquires_ro_transaction(self):
354
 
        self.branch.lock_read()
355
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
468
        self.get_branch().lock_read()
 
469
        self.failUnless(isinstance(self.get_branch().get_transaction(),
356
470
                                   transactions.ReadOnlyTransaction))
357
 
        self.branch.unlock()
 
471
        self.get_branch().unlock()
358
472
        
359
 
    def test_lock_write_acquires_passthrough_transaction(self):
360
 
        self.branch.lock_write()
 
473
    def test_lock_write_acquires_write_transaction(self):
 
474
        self.get_branch().lock_write()
361
475
        # cannot use get_transaction as its magic
362
 
        self.failUnless(isinstance(self.branch._transaction,
363
 
                                   transactions.PassThroughTransaction))
364
 
        self.branch.unlock()
365
 
 
366
 
 
367
 
class TestBranchPushLocations(TestCaseInTempDir):
368
 
 
369
 
    def setUp(self):
370
 
        super(TestBranchPushLocations, self).setUp()
371
 
        self.branch = Branch.initialize(u'.')
372
 
        
 
476
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
 
477
                                   transactions.WriteTransaction))
 
478
        self.get_branch().unlock()
 
479
 
 
480
 
 
481
class TestBranchPushLocations(TestCaseWithBranch):
 
482
 
373
483
    def test_get_push_location_unset(self):
374
 
        self.assertEqual(None, self.branch.get_push_location())
 
484
        self.assertEqual(None, self.get_branch().get_push_location())
375
485
 
376
486
    def test_get_push_location_exact(self):
377
 
        self.build_tree(['.bazaar/'])
378
 
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
379
 
                                                       "push_location=foo" %
380
 
                                                       os.getcwdu())
381
 
        self.assertEqual("foo", self.branch.get_push_location())
 
487
        from bzrlib.config import (branches_config_filename,
 
488
                                   ensure_config_dir_exists)
 
489
        ensure_config_dir_exists()
 
490
        fn = branches_config_filename()
 
491
        print >> open(fn, 'wt'), ("[%s]\n"
 
492
                                  "push_location=foo" %
 
493
                                  self.get_branch().base[:-1])
 
494
        self.assertEqual("foo", self.get_branch().get_push_location())
382
495
 
383
496
    def test_set_push_location(self):
384
 
        self.branch.set_push_location('foo')
 
497
        from bzrlib.config import (branches_config_filename,
 
498
                                   ensure_config_dir_exists)
 
499
        ensure_config_dir_exists()
 
500
        fn = branches_config_filename()
 
501
        self.get_branch().set_push_location('foo')
385
502
        self.assertFileEqual("[%s]\n"
386
 
                             "push_location = foo" % os.getcwdu(),
387
 
                             '.bazaar/branches.conf')
 
503
                             "push_location = foo" % self.get_branch().base[:-1],
 
504
                             fn)
388
505
 
389
506
    # TODO RBC 20051029 test getting a push location from a branch in a 
390
507
    # recursive section - that is, it appends the branch name.
 
508
 
 
509
 
 
510
class TestFormat(TestCaseWithBranch):
 
511
    """Tests for the format itself."""
 
512
 
 
513
    def test_format_initialize_find_open(self):
 
514
        # loopback test to check the current format initializes to itself.
 
515
        if not self.branch_format.is_supported():
 
516
            # unsupported formats are not loopback testable
 
517
            # because the default open will not open them and
 
518
            # they may not be initializable.
 
519
            return
 
520
        # supported formats must be able to init and open
 
521
        t = get_transport(self.get_url())
 
522
        readonly_t = get_transport(self.get_readonly_url())
 
523
        made_branch = self.make_branch('.')
 
524
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
525
 
 
526
        # find it via bzrdir opening:
 
527
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
528
        direct_opened_branch = opened_control.open_branch()
 
529
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
 
530
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
 
531
        self.failUnless(isinstance(direct_opened_branch._format,
 
532
                        self.branch_format.__class__))
 
533
 
 
534
        # find it via Branch.open
 
535
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
 
536
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
537
        self.assertEqual(made_branch._format.__class__,
 
538
                         opened_branch._format.__class__)
 
539
        # if it has a unique id string, can we probe for it ?
 
540
        try:
 
541
            self.branch_format.get_format_string()
 
542
        except NotImplementedError:
 
543
            return
 
544
        self.assertEqual(self.branch_format,
 
545
                         bzrlib.branch.BranchFormat.find_format(opened_control))