/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

[merge] update from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2005 Canonical Ltd
 
1
# (C) 2005, 2006 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
17
19
import os
18
20
import sys
19
21
 
 
22
import bzrlib.branch as branch
20
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
21
 
from bzrlib.clone import copy_branch
22
24
from bzrlib.commit import commit
23
25
import bzrlib.errors as errors
24
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
26
from bzrlib.errors import (FileExists,
 
27
                           NoSuchRevision,
 
28
                           NoSuchFile,
 
29
                           UninitializableFormat,
 
30
                           NotBranchError,
 
31
                           )
25
32
import bzrlib.gpg
26
33
from bzrlib.osutils import getcwd
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir
28
 
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
34
from bzrlib.revision import NULL_REVISION
 
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
29
36
from bzrlib.trace import mutter
30
37
import bzrlib.transactions as transactions
31
 
from bzrlib.revision import NULL_REVISION
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
from bzrlib.upgrade import upgrade
 
42
from bzrlib.workingtree import WorkingTree
32
43
 
33
44
# TODO: Make a branch using basis branch, and check that it 
34
45
# doesn't request any files that could have been avoided, by 
35
46
# hooking into the Transport.
36
47
 
37
 
class TestBranch(TestCaseInTempDir):
 
48
 
 
49
class TestCaseWithBranch(TestCaseWithTransport):
 
50
 
 
51
    def setUp(self):
 
52
        super(TestCaseWithBranch, self).setUp()
 
53
        self.branch = None
 
54
 
 
55
    def get_branch(self):
 
56
        if self.branch is None:
 
57
            self.branch = self.make_branch(None)
 
58
        return self.branch
 
59
 
 
60
    def make_branch(self, relpath):
 
61
        try:
 
62
            url = self.get_url(relpath)
 
63
            segments = url.split('/')
 
64
            if segments and segments[-1] not in ('', '.'):
 
65
                parent = '/'.join(segments[:-1])
 
66
                t = get_transport(parent)
 
67
                try:
 
68
                    t.mkdir(segments[-1])
 
69
                except FileExists:
 
70
                    pass
 
71
            return self.branch_format.initialize(url)
 
72
        except UninitializableFormat:
 
73
            raise TestSkipped("Format %s is not initializable.")
 
74
 
 
75
 
 
76
class TestBranch(TestCaseWithBranch):
38
77
 
39
78
    def test_append_revisions(self):
40
79
        """Test appending more than one revision"""
41
 
        br = Branch.initialize(u".")
 
80
        br = self.get_branch()
42
81
        br.append_revision("rev1")
43
82
        self.assertEquals(br.revision_history(), ["rev1",])
44
83
        br.append_revision("rev2", "rev3")
47
86
    def test_fetch_revisions(self):
48
87
        """Test fetch-revision operation."""
49
88
        from bzrlib.fetch import Fetcher
50
 
        os.mkdir('b1')
51
 
        os.mkdir('b2')
52
 
        b1 = Branch.initialize('b1')
53
 
        b2 = Branch.initialize('b2')
 
89
        get_transport(self.get_url()).mkdir('b1')
 
90
        get_transport(self.get_url()).mkdir('b2')
 
91
        b1 = self.make_branch('b1')
 
92
        b2 = self.make_branch('b2')
 
93
        wt = WorkingTree.create(b1, 'b1')
54
94
        file('b1/foo', 'w').write('hello')
55
 
        b1.working_tree().add(['foo'], ['foo-id'])
56
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
95
        wt.add(['foo'], ['foo-id'])
 
96
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
57
97
 
58
98
        mutter('start fetch')
59
99
        f = Fetcher(from_branch=b1, to_branch=b2)
60
100
        eq = self.assertEquals
61
101
        eq(f.count_copied, 1)
62
 
        eq(f.last_revision, 'revision-1')
 
102
        eq(f._last_revision, 'revision-1')
63
103
 
64
 
        rev = b2.get_revision('revision-1')
65
 
        tree = b2.revision_tree('revision-1')
 
104
        rev = b2.repository.get_revision('revision-1')
 
105
        tree = b2.repository.revision_tree('revision-1')
66
106
        eq(tree.get_file_text('foo-id'), 'hello')
67
107
 
68
108
    def test_revision_tree(self):
69
 
        b1 = Branch.initialize(u'.')
70
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
71
 
        tree = b1.revision_tree('revision-1')
72
 
        tree = b1.revision_tree(None)
 
109
        b1 = self.get_branch()
 
110
        wt = WorkingTree.create(b1, '.')
 
111
        wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
 
112
        tree = b1.repository.revision_tree('revision-1')
 
113
        tree = b1.repository.revision_tree(None)
73
114
        self.assertEqual(len(tree.list_files()), 0)
74
 
        tree = b1.revision_tree(NULL_REVISION)
 
115
        tree = b1.repository.revision_tree(NULL_REVISION)
75
116
        self.assertEqual(len(tree.list_files()), 0)
76
117
 
77
 
    def get_unbalanced_branch_pair(self):
 
118
    def get_unbalanced_tree_pair(self):
78
119
        """Return two branches, a and b, with one file in a."""
79
 
        os.mkdir('a')
80
 
        br_a = Branch.initialize("a")
 
120
        get_transport(self.get_url()).mkdir('a')
 
121
        br_a = self.make_branch('a')
 
122
        tree_a = WorkingTree.create(br_a, 'a')
81
123
        file('a/b', 'wb').write('b')
82
 
        br_a.working_tree().add('b')
83
 
        commit(br_a, "silly commit", rev_id='A')
84
 
        os.mkdir('b')
85
 
        br_b = Branch.initialize("b")
86
 
        return br_a, br_b
 
124
        tree_a.add('b')
 
125
        tree_a.commit("silly commit", rev_id='A')
 
126
 
 
127
        get_transport(self.get_url()).mkdir('b')
 
128
        br_b = self.make_branch('b')
 
129
        tree_b = WorkingTree.create(br_b, 'b')
 
130
        return tree_a, tree_b
87
131
 
88
132
    def get_balanced_branch_pair(self):
89
133
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
90
 
        br_a, br_b = self.get_unbalanced_branch_pair()
91
 
        br_a.push_stores(br_b)
92
 
        return br_a, br_b
 
134
        tree_a, tree_b = self.get_unbalanced_tree_pair()
 
135
        tree_a.branch.push_stores(tree_b.branch)
 
136
        return tree_a, tree_b
93
137
 
94
138
    def test_push_stores(self):
95
139
        """Copy the stores from one branch to another"""
96
 
        br_a, br_b = self.get_unbalanced_branch_pair()
 
140
        tree_a, tree_b = self.get_unbalanced_tree_pair()
 
141
        br_a = tree_a.branch
 
142
        br_b = tree_b.branch
97
143
        # ensure the revision is missing.
98
 
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
144
        self.assertRaises(NoSuchRevision, br_b.repository.get_revision, 
99
145
                          br_a.revision_history()[0])
100
146
        br_a.push_stores(br_b)
101
147
        # check that b now has all the data from a's first commit.
102
 
        rev = br_b.get_revision(br_a.revision_history()[0])
103
 
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
148
        rev = br_b.repository.get_revision(br_a.revision_history()[0])
 
149
        tree = br_b.repository.revision_tree(br_a.revision_history()[0])
104
150
        for file_id in tree:
105
151
            if tree.inventory[file_id].kind == "file":
106
152
                tree.get_file(file_id).read()
107
 
        return br_a, br_b
108
153
 
109
 
    def test_copy_branch(self):
 
154
    def test_clone_branch(self):
110
155
        """Copy the stores from one branch to another"""
111
 
        br_a, br_b = self.get_balanced_branch_pair()
112
 
        commit(br_b, "silly commit")
 
156
        tree_a, tree_b = self.get_balanced_branch_pair()
 
157
        tree_b.commit("silly commit")
113
158
        os.mkdir('c')
114
 
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
115
 
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
159
        br_c = tree_a.branch.clone('c', basis_branch=tree_b.branch)
 
160
        self.assertEqual(tree_a.branch.revision_history(),
 
161
                         br_c.revision_history())
116
162
 
117
 
    def test_copy_partial(self):
 
163
    def test_clone_partial(self):
118
164
        """Copy only part of the history of a branch."""
119
 
        self.build_tree(['a/', 'a/one'])
120
 
        br_a = Branch.initialize('a')
121
 
        br_a.working_tree().add(['one'])
122
 
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
165
        get_transport(self.get_url()).mkdir('a')
 
166
        br_a = self.make_branch('a')
 
167
        wt = WorkingTree.create(br_a, "a")
 
168
        self.build_tree(['a/one'])
 
169
        wt.add(['one'])
 
170
        wt.commit('commit one', rev_id='u@d-1')
123
171
        self.build_tree(['a/two'])
124
 
        br_a.working_tree().add(['two'])
125
 
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
126
 
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
172
        wt.add(['two'])
 
173
        wt.commit('commit two', rev_id='u@d-2')
 
174
        br_b = br_a.clone('b', revision='u@d-1')
127
175
        self.assertEqual(br_b.last_revision(), 'u@d-1')
128
176
        self.assertTrue(os.path.exists('b/one'))
129
177
        self.assertFalse(os.path.exists('b/two'))
130
178
        
131
179
    def test_record_initial_ghost_merge(self):
132
180
        """A pending merge with no revision present is still a merge."""
133
 
        branch = Branch.initialize(u'.')
134
 
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
135
 
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
136
 
        rev = branch.get_revision(branch.last_revision())
 
181
        branch = self.get_branch()
 
182
        wt = WorkingTree.create(branch, ".")
 
183
        wt.add_pending_merge('non:existent@rev--ision--0--2')
 
184
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
 
185
        rev = branch.repository.get_revision(branch.last_revision())
137
186
        self.assertEqual(len(rev.parent_ids), 1)
138
187
        # parent_sha1s is not populated now, WTF. rbc 20051003
139
188
        self.assertEqual(len(rev.parent_sha1s), 0)
140
189
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
141
190
 
142
191
    def test_bad_revision(self):
143
 
        branch = Branch.initialize(u'.')
144
 
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
192
        self.assertRaises(errors.InvalidRevisionId,
 
193
                          self.get_branch().repository.get_revision,
 
194
                          None)
145
195
 
146
196
# TODO 20051003 RBC:
147
197
# compare the gpg-to-sign info for a commit with a ghost and 
150
200
        
151
201
    def test_pending_merges(self):
152
202
        """Tracking pending-merged revisions."""
153
 
        b = Branch.initialize(u'.')
154
 
        wt = b.working_tree()
 
203
        b = self.get_branch()
 
204
        wt = WorkingTree.create(b, '.')
155
205
        self.assertEquals(wt.pending_merges(), [])
156
206
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
207
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
161
211
        self.assertEquals(wt.pending_merges(),
162
212
                          ['foo@azkhazan-123123-abcabc',
163
213
                           'wibble@fofof--20050401--1928390812'])
164
 
        b.working_tree().commit("commit from base with two merges")
165
 
        rev = b.get_revision(b.revision_history()[0])
 
214
        wt.commit("commit from base with two merges")
 
215
        rev = b.repository.get_revision(b.revision_history()[0])
166
216
        self.assertEquals(len(rev.parent_ids), 2)
167
217
        self.assertEquals(rev.parent_ids[0],
168
218
                          'foo@azkhazan-123123-abcabc')
172
222
        self.assertEquals(wt.pending_merges(), [])
173
223
 
174
224
    def test_sign_existing_revision(self):
175
 
        branch = Branch.initialize(u'.')
176
 
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
225
        branch = self.get_branch()
 
226
        wt = WorkingTree.create(branch, ".")
 
227
        wt.commit("base", allow_pointless=True, rev_id='A')
177
228
        from bzrlib.testament import Testament
178
 
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
179
 
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
180
 
                         branch.revision_store.get('A', 'sig').read())
 
229
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
 
230
        branch.repository.sign_revision('A', strategy)
 
231
        self.assertEqual(Testament.from_revision(branch.repository, 
 
232
                         'A').as_short_text(),
 
233
                         branch.repository.revision_store.get('A', 
 
234
                         'sig').read())
181
235
 
182
236
    def test_store_signature(self):
183
 
        branch = Branch.initialize(u'.')
184
 
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
185
 
                                        'FOO', 'A')
186
 
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
187
 
 
188
 
    def test__relcontrolfilename(self):
189
 
        branch = Branch.initialize(u'.')
190
 
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
191
 
        
192
 
    def test__relcontrolfilename_empty(self):
193
 
        branch = Branch.initialize(u'.')
194
 
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
237
        branch = self.get_branch()
 
238
        branch.repository.store_revision_signature(
 
239
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
240
        self.assertEqual('FOO', 
 
241
                         branch.repository.revision_store.get('A', 
 
242
                         'sig').read())
 
243
 
 
244
    def test_branch_keeps_signatures(self):
 
245
        wt = self.make_branch_and_tree('source')
 
246
        wt.commit('A', allow_pointless=True, rev_id='A')
 
247
        wt.branch.repository.sign_revision('A',
 
248
            bzrlib.gpg.LoopbackGPGStrategy(None))
 
249
        #FIXME: clone should work to urls,
 
250
        # wt.clone should work to disks.
 
251
        self.build_tree(['target/'])
 
252
        b2 = wt.branch.clone('target')
 
253
        self.assertEqual(wt.branch.repository.revision_store.get('A', 
 
254
                            'sig').read(),
 
255
                         b2.repository.revision_store.get('A', 
 
256
                            'sig').read())
 
257
 
 
258
    def test_upgrade_preserves_signatures(self):
 
259
        # this is in the current test format
 
260
        wt = self.make_branch_and_tree('source')
 
261
        wt.commit('A', allow_pointless=True, rev_id='A')
 
262
        wt.branch.repository.sign_revision('A',
 
263
            bzrlib.gpg.LoopbackGPGStrategy(None))
 
264
        old_signature = wt.branch.repository.revision_store.get('A',
 
265
            'sig').read()
 
266
        upgrade(wt.basedir)
 
267
        wt = WorkingTree(wt.basedir)
 
268
        new_signature = wt.branch.repository.revision_store.get('A',
 
269
            'sig').read()
 
270
        self.assertEqual(old_signature, new_signature)
195
271
 
196
272
    def test_nicks(self):
197
273
        """Branch nicknames"""
198
 
        os.mkdir('bzr.dev')
199
 
        branch = Branch.initialize('bzr.dev')
 
274
        t = get_transport(self.get_url())
 
275
        t.mkdir('bzr.dev')
 
276
        branch = self.make_branch('bzr.dev')
200
277
        self.assertEqual(branch.nick, 'bzr.dev')
201
 
        os.rename('bzr.dev', 'bzr.ab')
202
 
        branch = Branch.open('bzr.ab')
 
278
        t.move('bzr.dev', 'bzr.ab')
 
279
        branch = Branch.open(self.get_url('bzr.ab'))
203
280
        self.assertEqual(branch.nick, 'bzr.ab')
204
281
        branch.nick = "Aaron's branch"
205
282
        branch.nick = "Aaron's branch"
206
 
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
283
        self.failUnless(
 
284
            t.has(
 
285
                t.relpath(
 
286
                    branch.control_files.controlfilename("branch.conf")
 
287
                    )
 
288
                )
 
289
            )
207
290
        self.assertEqual(branch.nick, "Aaron's branch")
208
 
        os.rename('bzr.ab', 'integration')
209
 
        branch = Branch.open('integration')
 
291
        t.move('bzr.ab', 'integration')
 
292
        branch = Branch.open(self.get_url('integration'))
210
293
        self.assertEqual(branch.nick, "Aaron's branch")
211
294
        branch.nick = u"\u1234"
212
295
        self.assertEqual(branch.nick, u"\u1234")
213
296
 
214
297
    def test_commit_nicks(self):
215
298
        """Nicknames are committed to the revision"""
216
 
        os.mkdir('bzr.dev')
217
 
        branch = Branch.initialize('bzr.dev')
 
299
        get_transport(self.get_url()).mkdir('bzr.dev')
 
300
        branch = self.make_branch('bzr.dev')
218
301
        branch.nick = "My happy branch"
219
 
        branch.working_tree().commit('My commit respect da nick.')
220
 
        committed = branch.get_revision(branch.last_revision())
 
302
        WorkingTree.create(branch, 'bzr.dev').commit('My commit respect da nick.')
 
303
        committed = branch.repository.get_revision(branch.last_revision())
221
304
        self.assertEqual(committed.properties["branch-nick"], 
222
305
                         "My happy branch")
223
306
 
224
 
 
225
 
class TestRemote(TestCaseWithWebserver):
 
307
    def test_no_ancestry_weave(self):
 
308
        # We no longer need to create the ancestry.weave file
 
309
        # since it is *never* used.
 
310
        branch = Branch.create('.')
 
311
        self.failIfExists('.bzr/ancestry.weave')
 
312
 
 
313
 
 
314
class ChrootedTests(TestCaseWithBranch):
 
315
    """A support class that provides readonly urls outside the local namespace.
 
316
 
 
317
    This is done by checking if self.transport_server is a MemoryServer. if it
 
318
    is then we are chrooted already, if it is not then an HttpServer is used
 
319
    for readonly urls.
 
320
    """
 
321
 
 
322
    def setUp(self):
 
323
        super(ChrootedTests, self).setUp()
 
324
        if not self.transport_server == MemoryServer:
 
325
            self.transport_readonly_server = HttpServer
226
326
 
227
327
    def test_open_containing(self):
228
328
        self.assertRaises(NotBranchError, Branch.open_containing,
229
 
                          self.get_remote_url(''))
 
329
                          self.get_readonly_url(''))
230
330
        self.assertRaises(NotBranchError, Branch.open_containing,
231
 
                          self.get_remote_url('g/p/q'))
232
 
        b = Branch.initialize(u'.')
233
 
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
331
                          self.get_readonly_url('g/p/q'))
 
332
        try:
 
333
            branch = self.branch_format.initialize(self.get_url())
 
334
        except UninitializableFormat:
 
335
            raise TestSkipped("Format %s is not initializable.")
 
336
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
234
337
        self.assertEqual('', relpath)
235
 
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
338
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
236
339
        self.assertEqual('g/p/q', relpath)
237
340
        
238
341
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
317
420
        self.assertEqual(['lw', 'ul'], branch._calls)
318
421
 
319
422
 
320
 
class TestBranchTransaction(TestCaseInTempDir):
 
423
class TestBranchTransaction(TestCaseWithBranch):
321
424
 
322
425
    def setUp(self):
323
426
        super(TestBranchTransaction, self).setUp()
324
 
        self.branch = Branch.initialize(u'.')
 
427
        self.branch = None
325
428
        
326
429
    def test_default_get_transaction(self):
327
430
        """branch.get_transaction on a new branch should give a PassThrough."""
328
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
431
        self.failUnless(isinstance(self.get_branch().get_transaction(),
329
432
                                   transactions.PassThroughTransaction))
330
433
 
331
434
    def test__set_new_transaction(self):
332
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
435
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
333
436
 
334
437
    def test__set_over_existing_transaction_raises(self):
335
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
438
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
336
439
        self.assertRaises(errors.LockError,
337
 
                          self.branch._set_transaction,
 
440
                          self.get_branch()._set_transaction,
338
441
                          transactions.ReadOnlyTransaction())
339
442
 
340
443
    def test_finish_no_transaction_raises(self):
341
 
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
444
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
342
445
 
343
446
    def test_finish_readonly_transaction_works(self):
344
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
345
 
        self.branch._finish_transaction()
346
 
        self.assertEqual(None, self.branch._transaction)
 
447
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
448
        self.get_branch()._finish_transaction()
 
449
        self.assertEqual(None, self.get_branch().control_files._transaction)
347
450
 
348
451
    def test_unlock_calls_finish(self):
349
 
        self.branch.lock_read()
 
452
        self.get_branch().lock_read()
350
453
        transaction = InstrumentedTransaction()
351
 
        self.branch._transaction = transaction
352
 
        self.branch.unlock()
 
454
        self.get_branch().control_files._transaction = transaction
 
455
        self.get_branch().unlock()
353
456
        self.assertEqual(['finish'], transaction.calls)
354
457
 
355
458
    def test_lock_read_acquires_ro_transaction(self):
356
 
        self.branch.lock_read()
357
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
459
        self.get_branch().lock_read()
 
460
        self.failUnless(isinstance(self.get_branch().get_transaction(),
358
461
                                   transactions.ReadOnlyTransaction))
359
 
        self.branch.unlock()
 
462
        self.get_branch().unlock()
360
463
        
361
464
    def test_lock_write_acquires_passthrough_transaction(self):
362
 
        self.branch.lock_write()
 
465
        self.get_branch().lock_write()
363
466
        # cannot use get_transaction as its magic
364
 
        self.failUnless(isinstance(self.branch._transaction,
 
467
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
365
468
                                   transactions.PassThroughTransaction))
366
 
        self.branch.unlock()
367
 
 
368
 
 
369
 
class TestBranchPushLocations(TestCaseInTempDir):
370
 
 
371
 
    def setUp(self):
372
 
        super(TestBranchPushLocations, self).setUp()
373
 
        self.branch = Branch.initialize(u'.')
374
 
        
 
469
        self.get_branch().unlock()
 
470
 
 
471
 
 
472
class TestBranchPushLocations(TestCaseWithBranch):
 
473
 
375
474
    def test_get_push_location_unset(self):
376
 
        self.assertEqual(None, self.branch.get_push_location())
 
475
        self.assertEqual(None, self.get_branch().get_push_location())
377
476
 
378
477
    def test_get_push_location_exact(self):
379
478
        from bzrlib.config import (branches_config_filename,
382
481
        fn = branches_config_filename()
383
482
        print >> open(fn, 'wt'), ("[%s]\n"
384
483
                                  "push_location=foo" %
385
 
                                  getcwd())
386
 
        self.assertEqual("foo", self.branch.get_push_location())
 
484
                                  self.get_branch().base[:-1])
 
485
        self.assertEqual("foo", self.get_branch().get_push_location())
387
486
 
388
487
    def test_set_push_location(self):
389
488
        from bzrlib.config import (branches_config_filename,
390
489
                                   ensure_config_dir_exists)
391
490
        ensure_config_dir_exists()
392
491
        fn = branches_config_filename()
393
 
        self.branch.set_push_location('foo')
 
492
        self.get_branch().set_push_location('foo')
394
493
        self.assertFileEqual("[%s]\n"
395
 
                             "push_location = foo" % getcwd(),
 
494
                             "push_location = foo" % self.get_branch().base[:-1],
396
495
                             fn)
397
496
 
398
497
    # TODO RBC 20051029 test getting a push location from a branch in a 
399
498
    # recursive section - that is, it appends the branch name.
 
499
 
 
500
 
 
501
class TestFormat(TestCaseWithBranch):
 
502
    """Tests for the format itself."""
 
503
 
 
504
    def test_format_initialize_find_open(self):
 
505
        # loopback test to check the current format initializes to itself.
 
506
        if not self.branch_format.is_supported():
 
507
            # unsupported formats are not loopback testable
 
508
            # because the default open will not open them and
 
509
            # they may not be initializable.
 
510
            return
 
511
        # supported formats must be able to init and open
 
512
        t = get_transport(self.get_url())
 
513
        readonly_t = get_transport(self.get_readonly_url())
 
514
        made_branch = self.branch_format.initialize(t.base)
 
515
        self.failUnless(isinstance(made_branch, branch.Branch))
 
516
        self.assertEqual(self.branch_format,
 
517
                         branch.BzrBranchFormat.find_format(readonly_t))
 
518
        direct_opened_branch = self.branch_format.open(readonly_t)
 
519
        opened_branch = branch.Branch.open(t.base)
 
520
        self.assertEqual(made_branch._branch_format,
 
521
                         opened_branch._branch_format)
 
522
        self.assertEqual(direct_opened_branch._branch_format,
 
523
                         opened_branch._branch_format)
 
524
        self.failUnless(isinstance(opened_branch, branch.Branch))
 
525
 
 
526
    def test_open_not_branch(self):
 
527
        self.assertRaises(NoSuchFile,
 
528
                          self.branch_format.open,
 
529
                          get_transport(self.get_readonly_url()))