/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: Robert Collins
  • Date: 2008-08-20 02:07:36 UTC
  • mfrom: (3640 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3682.
  • Revision ID: robertc@robertcollins.net-20080820020736-g2xe4921zzxtymle
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2005, 2006 Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
19
import os
20
20
import sys
21
21
 
22
 
import bzrlib.branch
23
 
import bzrlib.bzrdir as bzrdir
 
22
from bzrlib import (
 
23
    branch,
 
24
    bzrdir,
 
25
    errors,
 
26
    gpg,
 
27
    urlutils,
 
28
    transactions,
 
29
    remote,
 
30
    repository,
 
31
    tests,
 
32
    )
24
33
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
 
from bzrlib.commit import commit
26
 
import bzrlib.errors as errors
 
34
from bzrlib.delta import TreeDelta
27
35
from bzrlib.errors import (FileExists,
28
36
                           NoSuchRevision,
29
37
                           NoSuchFile,
30
38
                           UninitializableFormat,
31
39
                           NotBranchError,
32
40
                           )
33
 
import bzrlib.gpg
34
41
from bzrlib.osutils import getcwd
 
42
import bzrlib.revision
 
43
from bzrlib.symbol_versioning import deprecated_in
35
44
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
45
from bzrlib.tests.branch_implementations import TestCaseWithBranch
 
46
from bzrlib.tests.http_server import HttpServer
37
47
from bzrlib.trace import mutter
38
 
import bzrlib.transactions as transactions
39
48
from bzrlib.transport import get_transport
40
 
from bzrlib.transport.http import HttpServer
41
49
from bzrlib.transport.memory import MemoryServer
42
50
from bzrlib.upgrade import upgrade
43
51
from bzrlib.workingtree import WorkingTree
44
52
 
45
53
 
46
 
# TODO: Make a branch using basis branch, and check that it 
47
 
# doesn't request any files that could have been avoided, by 
48
 
# hooking into the Transport.
49
 
 
50
 
 
51
 
class TestCaseWithBranch(TestCaseWithBzrDir):
52
 
 
53
 
    def setUp(self):
54
 
        super(TestCaseWithBranch, self).setUp()
55
 
        self.branch = None
56
 
 
57
 
    def get_branch(self):
58
 
        if self.branch is None:
59
 
            self.branch = self.make_branch('')
60
 
        return self.branch
61
 
 
62
 
    def make_branch(self, relpath, format=None):
63
 
        repo = self.make_repository(relpath, format=format)
64
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
65
 
        # Skipped is the wrong exception to raise.
66
 
        try:
67
 
            return self.branch_format.initialize(repo.bzrdir)
68
 
        except errors.UninitializableFormat:
69
 
            raise TestSkipped('Uninitializable branch format')
70
 
 
71
 
    def make_repository(self, relpath, shared=False, format=None):
72
 
        made_control = self.make_bzrdir(relpath, format=format)
73
 
        return made_control.create_repository(shared=shared)
74
 
 
75
 
 
76
54
class TestBranch(TestCaseWithBranch):
77
55
 
78
 
    def test_append_revisions(self):
79
 
        """Test appending more than one revision"""
 
56
    def test_create_tree_with_merge(self):
 
57
        tree = self.create_tree_with_merge()
 
58
        tree.lock_read()
 
59
        self.addCleanup(tree.unlock)
 
60
        graph = tree.branch.repository.get_graph()
 
61
        ancestry_graph = graph.get_parent_map(
 
62
            tree.branch.repository.all_revision_ids())
 
63
        self.assertEqual({'rev-1':('null:',),
 
64
                          'rev-2':('rev-1', ),
 
65
                          'rev-1.1.1':('rev-1', ),
 
66
                          'rev-3':('rev-2', 'rev-1.1.1', ),
 
67
                         }, ancestry_graph)
 
68
 
 
69
    def test_revision_ids_are_utf8(self):
 
70
        wt = self.make_branch_and_tree('tree')
 
71
        wt.commit('f', rev_id='rev1')
 
72
        wt.commit('f', rev_id='rev2')
 
73
        wt.commit('f', rev_id='rev3')
 
74
 
80
75
        br = self.get_branch()
81
 
        br.append_revision("rev1")
82
 
        self.assertEquals(br.revision_history(), ["rev1",])
83
 
        br.append_revision("rev2", "rev3")
84
 
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
76
        br.fetch(wt.branch)
 
77
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
 
78
        rh = br.revision_history()
 
79
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
 
80
        for revision_id in rh:
 
81
            self.assertIsInstance(revision_id, str)
 
82
        last = br.last_revision()
 
83
        self.assertEqual('rev3', last)
 
84
        self.assertIsInstance(last, str)
 
85
        revno, last = br.last_revision_info()
 
86
        self.assertEqual(3, revno)
 
87
        self.assertEqual('rev3', last)
 
88
        self.assertIsInstance(last, str)
85
89
 
86
90
    def test_fetch_revisions(self):
87
91
        """Test fetch-revision operation."""
88
 
        get_transport(self.get_url()).mkdir('b1')
89
 
        get_transport(self.get_url()).mkdir('b2')
90
92
        wt = self.make_branch_and_tree('b1')
91
93
        b1 = wt.branch
92
 
        b2 = self.make_branch('b2')
93
 
        file('b1/foo', 'w').write('hello')
 
94
        self.build_tree_contents([('b1/foo', 'hello')])
94
95
        wt.add(['foo'], ['foo-id'])
95
96
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
96
97
 
97
 
        mutter('start fetch')
 
98
        b2 = self.make_branch('b2')
98
99
        self.assertEqual((1, []), b2.fetch(b1))
99
100
 
100
101
        rev = b2.repository.get_revision('revision-1')
101
102
        tree = b2.repository.revision_tree('revision-1')
 
103
        tree.lock_read()
 
104
        self.addCleanup(tree.unlock)
102
105
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
103
106
 
 
107
    def test_get_revision_delta(self):
 
108
        tree_a = self.make_branch_and_tree('a')
 
109
        self.build_tree(['a/foo'])
 
110
        tree_a.add('foo', 'file1')
 
111
        tree_a.commit('rev1', rev_id='rev1')
 
112
        self.build_tree(['a/vla'])
 
113
        tree_a.add('vla', 'file2')
 
114
        tree_a.commit('rev2', rev_id='rev2')
 
115
 
 
116
        delta = tree_a.branch.get_revision_delta(1)
 
117
        self.assertIsInstance(delta, TreeDelta)
 
118
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
 
119
        delta = tree_a.branch.get_revision_delta(2)
 
120
        self.assertIsInstance(delta, TreeDelta)
 
121
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
 
122
 
104
123
    def get_unbalanced_tree_pair(self):
105
124
        """Return two branches, a and b, with one file in a."""
106
 
        get_transport(self.get_url()).mkdir('a')
107
125
        tree_a = self.make_branch_and_tree('a')
108
 
        file('a/b', 'wb').write('b')
 
126
        self.build_tree_contents([('a/b', 'b')])
109
127
        tree_a.add('b')
110
128
        tree_a.commit("silly commit", rev_id='A')
111
129
 
112
 
        get_transport(self.get_url()).mkdir('b')
113
130
        tree_b = self.make_branch_and_tree('b')
114
131
        return tree_a, tree_b
115
132
 
119
136
        tree_b.branch.repository.fetch(tree_a.branch.repository)
120
137
        return tree_a, tree_b
121
138
 
122
 
    def test_clone_branch(self):
123
 
        """Copy the stores from one branch to another"""
124
 
        tree_a, tree_b = self.get_balanced_branch_pair()
125
 
        tree_b.commit("silly commit")
126
 
        os.mkdir('c')
127
 
        # this fails to test that the history from a was not used.
128
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
 
        self.assertEqual(tree_a.branch.revision_history(),
130
 
                         dir_c.open_branch().revision_history())
131
 
 
132
139
    def test_clone_partial(self):
133
140
        """Copy only part of the history of a branch."""
134
141
        # TODO: RBC 20060208 test with a revision not on revision-history.
135
142
        #       what should that behaviour be ? Emailed the list.
136
 
        wt_a = self.make_branch_and_tree('a')
137
 
        self.build_tree(['a/one'])
138
 
        wt_a.add(['one'])
139
 
        wt_a.commit('commit one', rev_id='1')
140
 
        self.build_tree(['a/two'])
141
 
        wt_a.add(['two'])
142
 
        wt_a.commit('commit two', rev_id='2')
143
 
        repo_b = self.make_repository('b')
144
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
 
        self.assertEqual(br_b.last_revision(), '1')
147
 
 
148
 
    def test_sprout_partial(self):
149
 
        # test sprouting with a prefix of the revision-history.
150
 
        # also needs not-on-revision-history behaviour defined.
151
 
        wt_a = self.make_branch_and_tree('a')
152
 
        self.build_tree(['a/one'])
153
 
        wt_a.add(['one'])
154
 
        wt_a.commit('commit one', rev_id='1')
155
 
        self.build_tree(['a/two'])
156
 
        wt_a.add(['two'])
157
 
        wt_a.commit('commit two', rev_id='2')
158
 
        repo_b = self.make_repository('b')
159
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
 
        self.assertEqual(br_b.last_revision(), '1')
 
143
        # First, make a branch with two commits.
 
144
        wt_a = self.make_branch_and_tree('a')
 
145
        self.build_tree(['a/one'])
 
146
        wt_a.add(['one'])
 
147
        wt_a.commit('commit one', rev_id='1')
 
148
        self.build_tree(['a/two'])
 
149
        wt_a.add(['two'])
 
150
        wt_a.commit('commit two', rev_id='2')
 
151
        # Now make a copy of the repository.
 
152
        repo_b = self.make_repository('b')
 
153
        wt_a.branch.repository.copy_content_into(repo_b)
 
154
        # wt_a might be a lightweight checkout, so get a hold of the actual
 
155
        # branch (because you can't do a partial clone of a lightweight
 
156
        # checkout).
 
157
        branch = wt_a.branch.bzrdir.open_branch()
 
158
        # Then make a branch where the new repository is, but specify a revision
 
159
        # ID.  The new branch's history will stop at the specified revision.
 
160
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
 
161
        self.assertEqual('1', br_b.last_revision())
 
162
 
 
163
    def get_parented_branch(self):
 
164
        wt_a = self.make_branch_and_tree('a')
 
165
        self.build_tree(['a/one'])
 
166
        wt_a.add(['one'])
 
167
        wt_a.commit('commit one', rev_id='1')
 
168
 
 
169
        branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
 
170
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
 
171
        return branch_b
162
172
 
163
173
    def test_clone_branch_nickname(self):
164
174
        # test the nick name is preserved always
165
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
175
        raise TestSkipped('XXX branch cloning is not yet tested.')
166
176
 
167
177
    def test_clone_branch_parent(self):
168
178
        # test the parent is preserved always
169
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
170
 
        
171
 
    def test_sprout_branch_nickname(self):
172
 
        # test the nick name is reset always
173
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
174
 
 
175
 
    def test_sprout_branch_parent(self):
176
 
        source = self.make_branch('source')
177
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
179
 
        
180
 
    def test_record_initial_ghost_merge(self):
181
 
        """A pending merge with no revision present is still a merge."""
 
179
        branch_b = self.get_parented_branch()
 
180
        repo_c = self.make_repository('c')
 
181
        branch_b.repository.copy_content_into(repo_c)
 
182
        branch_c = branch_b.clone(repo_c.bzrdir)
 
183
        self.assertNotEqual(None, branch_c.get_parent())
 
184
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
 
185
 
 
186
        # We can also set a specific parent, and it should be honored
 
187
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
 
188
        branch_b.set_parent(random_parent)
 
189
        repo_d = self.make_repository('d')
 
190
        branch_b.repository.copy_content_into(repo_d)
 
191
        branch_d = branch_b.clone(repo_d.bzrdir)
 
192
        self.assertEqual(random_parent, branch_d.get_parent())
 
193
 
 
194
    def test_submit_branch(self):
 
195
        """Submit location can be queried and set"""
 
196
        branch = self.make_branch('branch')
 
197
        self.assertEqual(branch.get_submit_branch(), None)
 
198
        branch.set_submit_branch('sftp://example.com')
 
199
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
 
200
        branch.set_submit_branch('sftp://example.net')
 
201
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
202
        
 
203
    def test_public_branch(self):
 
204
        """public location can be queried and set"""
 
205
        branch = self.make_branch('branch')
 
206
        self.assertEqual(branch.get_public_branch(), None)
 
207
        branch.set_public_branch('sftp://example.com')
 
208
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
 
209
        branch.set_public_branch('sftp://example.net')
 
210
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
 
211
        branch.set_public_branch(None)
 
212
        self.assertEqual(branch.get_public_branch(), None)
 
213
 
 
214
    def test_record_initial_ghost(self):
 
215
        """Branches should support having ghosts."""
182
216
        wt = self.make_branch_and_tree('.')
183
 
        branch = wt.branch
184
 
        wt.add_pending_merge('non:existent@rev--ision--0--2')
185
 
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
 
        rev = branch.repository.get_revision(branch.last_revision())
187
 
        self.assertEqual(len(rev.parent_ids), 1)
 
217
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
 
218
            allow_leftmost_as_ghost=True)
 
219
        self.assertEqual(['non:existent@rev--ision--0--2'],
 
220
            wt.get_parent_ids())
 
221
        rev_id = wt.commit('commit against a ghost first parent.')
 
222
        rev = wt.branch.repository.get_revision(rev_id)
 
223
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
188
224
        # parent_sha1s is not populated now, WTF. rbc 20051003
189
225
        self.assertEqual(len(rev.parent_sha1s), 0)
190
 
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
226
 
 
227
    def test_record_two_ghosts(self):
 
228
        """Recording with all ghosts works."""
 
229
        wt = self.make_branch_and_tree('.')
 
230
        wt.set_parent_ids([
 
231
                'foo@azkhazan-123123-abcabc',
 
232
                'wibble@fofof--20050401--1928390812',
 
233
            ],
 
234
            allow_leftmost_as_ghost=True)
 
235
        rev_id = wt.commit("commit from ghost base with one merge")
 
236
        # the revision should have been committed with two parents
 
237
        rev = wt.branch.repository.get_revision(rev_id)
 
238
        self.assertEqual(['foo@azkhazan-123123-abcabc',
 
239
            'wibble@fofof--20050401--1928390812'],
 
240
            rev.parent_ids)
191
241
 
192
242
    def test_bad_revision(self):
193
243
        self.assertRaises(errors.InvalidRevisionId,
199
249
#     an identical tree without a ghost
200
250
# fetch missing should rewrite the TOC of weaves to list newly available parents.
201
251
        
202
 
    def test_pending_merges(self):
203
 
        """Tracking pending-merged revisions."""
204
 
        wt = self.make_branch_and_tree('.')
205
 
        b = wt.branch
206
 
        self.assertEquals(wt.pending_merges(), [])
207
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
 
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
 
        self.assertEquals(wt.pending_merges(),
213
 
                          ['foo@azkhazan-123123-abcabc',
214
 
                           'wibble@fofof--20050401--1928390812'])
215
 
        wt.commit("commit from base with two merges")
216
 
        rev = b.repository.get_revision(b.revision_history()[0])
217
 
        self.assertEquals(len(rev.parent_ids), 2)
218
 
        self.assertEquals(rev.parent_ids[0],
219
 
                          'foo@azkhazan-123123-abcabc')
220
 
        self.assertEquals(rev.parent_ids[1],
221
 
                           'wibble@fofof--20050401--1928390812')
222
 
        # list should be cleared when we do a commit
223
 
        self.assertEquals(wt.pending_merges(), [])
224
 
 
225
252
    def test_sign_existing_revision(self):
226
253
        wt = self.make_branch_and_tree('.')
227
254
        branch = wt.branch
228
255
        wt.commit("base", allow_pointless=True, rev_id='A')
229
256
        from bzrlib.testament import Testament
230
 
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
 
257
        strategy = gpg.LoopbackGPGStrategy(None)
 
258
        branch.repository.lock_write()
 
259
        branch.repository.start_write_group()
231
260
        branch.repository.sign_revision('A', strategy)
232
 
        self.assertEqual(Testament.from_revision(branch.repository, 
233
 
                         'A').as_short_text(),
 
261
        branch.repository.commit_write_group()
 
262
        branch.repository.unlock()
 
263
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
 
264
                         Testament.from_revision(branch.repository,
 
265
                         'A').as_short_text() +
 
266
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
234
267
                         branch.repository.get_signature_text('A'))
235
268
 
236
269
    def test_store_signature(self):
237
270
        wt = self.make_branch_and_tree('.')
238
271
        branch = wt.branch
239
 
        branch.repository.store_revision_signature(
240
 
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
272
        branch.lock_write()
 
273
        try:
 
274
            branch.repository.start_write_group()
 
275
            try:
 
276
                branch.repository.store_revision_signature(
 
277
                    gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
278
            except:
 
279
                branch.repository.abort_write_group()
 
280
                raise
 
281
            else:
 
282
                branch.repository.commit_write_group()
 
283
        finally:
 
284
            branch.unlock()
 
285
        # A signature without a revision should not be accessible.
241
286
        self.assertRaises(errors.NoSuchRevision,
242
287
                          branch.repository.has_signature_for_revision_id,
243
288
                          'A')
244
289
        wt.commit("base", allow_pointless=True, rev_id='A')
245
 
        self.assertEqual('FOO', 
 
290
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
 
291
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
246
292
                         branch.repository.get_signature_text('A'))
247
293
 
248
294
    def test_branch_keeps_signatures(self):
249
295
        wt = self.make_branch_and_tree('source')
250
296
        wt.commit('A', allow_pointless=True, rev_id='A')
251
 
        wt.branch.repository.sign_revision('A',
252
 
            bzrlib.gpg.LoopbackGPGStrategy(None))
 
297
        repo = wt.branch.repository
 
298
        repo.lock_write()
 
299
        repo.start_write_group()
 
300
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
 
301
        repo.commit_write_group()
 
302
        repo.unlock()
253
303
        #FIXME: clone should work to urls,
254
304
        # wt.clone should work to disks.
255
305
        self.build_tree(['target/'])
256
 
        d2 = wt.bzrdir.clone('target')
257
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
 
306
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
 
307
        self.assertEqual(repo.get_signature_text('A'),
258
308
                         d2.open_repository().get_signature_text('A'))
259
309
 
 
310
    def test_missing_revisions(self):
 
311
        t1 = self.make_branch_and_tree('b1')
 
312
        rev1 = t1.commit('one')
 
313
        t2 = t1.bzrdir.sprout('b2').open_workingtree()
 
314
        rev2 = t1.commit('two')
 
315
        rev3 = t1.commit('three')
 
316
 
 
317
        self.assertEqual([rev2, rev3],
 
318
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
319
            t2.branch.missing_revisions, t1.branch))
 
320
 
 
321
        self.assertEqual([],
 
322
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
323
            t2.branch.missing_revisions, t1.branch, stop_revision=1))
 
324
        self.assertEqual([rev2],
 
325
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
326
            t2.branch.missing_revisions, t1.branch, stop_revision=2))
 
327
        self.assertEqual([rev2, rev3],
 
328
            self.applyDeprecated(deprecated_in((1, 6, 0)),
 
329
            t2.branch.missing_revisions, t1.branch, stop_revision=3))
 
330
 
 
331
        self.assertRaises(errors.NoSuchRevision,
 
332
            self.applyDeprecated, deprecated_in((1, 6, 0)),
 
333
            t2.branch.missing_revisions, t1.branch, stop_revision=4)
 
334
 
 
335
        rev4 = t2.commit('four')
 
336
        self.assertRaises(errors.DivergedBranches,
 
337
            self.applyDeprecated, deprecated_in((1, 6, 0)),
 
338
            t2.branch.missing_revisions, t1.branch)
 
339
 
260
340
    def test_nicks(self):
261
 
        """Branch nicknames"""
 
341
        """Test explicit and implicit branch nicknames.
 
342
        
 
343
        Nicknames are implicitly the name of the branch's directory, unless an
 
344
        explicit nickname is set.  That is, an explicit nickname always
 
345
        overrides the implicit one.
 
346
        """
262
347
        t = get_transport(self.get_url())
263
 
        t.mkdir('bzr.dev')
264
348
        branch = self.make_branch('bzr.dev')
 
349
        # The nick will be 'bzr.dev', because there is no explicit nick set.
265
350
        self.assertEqual(branch.nick, 'bzr.dev')
 
351
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
 
352
        # will report its nick as 'bzr.ab'.
266
353
        t.move('bzr.dev', 'bzr.ab')
267
354
        branch = Branch.open(self.get_url('bzr.ab'))
268
355
        self.assertEqual(branch.nick, 'bzr.ab')
269
 
        branch.nick = "Aaron's branch"
270
 
        branch.nick = "Aaron's branch"
271
 
        self.failUnless(
272
 
            t.has(
273
 
                t.relpath(
274
 
                    branch.control_files.controlfilename("branch.conf")
275
 
                    )
276
 
                )
277
 
            )
 
356
        # Set the branch nick explicitly.  This will ensure there's a branch
 
357
        # config file in the branch.
 
358
        branch.nick = "Aaron's branch"
 
359
        if not isinstance(branch, remote.RemoteBranch):
 
360
            self.failUnless(branch._transport.has("branch.conf"))
 
361
        # Because the nick has been set explicitly, the nick is now always
 
362
        # "Aaron's branch", regardless of directory name.
278
363
        self.assertEqual(branch.nick, "Aaron's branch")
279
364
        t.move('bzr.ab', 'integration')
280
365
        branch = Branch.open(self.get_url('integration'))
284
369
 
285
370
    def test_commit_nicks(self):
286
371
        """Nicknames are committed to the revision"""
287
 
        get_transport(self.get_url()).mkdir('bzr.dev')
288
372
        wt = self.make_branch_and_tree('bzr.dev')
289
373
        branch = wt.branch
290
374
        branch.nick = "My happy branch"
291
375
        wt.commit('My commit respect da nick.')
292
376
        committed = branch.repository.get_revision(branch.last_revision())
293
 
        self.assertEqual(committed.properties["branch-nick"], 
 
377
        self.assertEqual(committed.properties["branch-nick"],
294
378
                         "My happy branch")
295
379
 
296
380
    def test_create_open_branch_uses_repository(self):
298
382
            repo = self.make_repository('.', shared=True)
299
383
        except errors.IncompatibleFormat:
300
384
            return
301
 
        repo.bzrdir.root_transport.mkdir('child')
302
 
        child_dir = self.bzrdir_format.initialize('child')
 
385
        child_transport = repo.bzrdir.root_transport.clone('child')
 
386
        child_transport.mkdir('.')
 
387
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
303
388
        try:
304
389
            child_branch = self.branch_format.initialize(child_dir)
305
390
        except errors.UninitializableFormat:
307
392
            return
308
393
        self.assertEqual(repo.bzrdir.root_transport.base,
309
394
                         child_branch.repository.bzrdir.root_transport.base)
310
 
        child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
 
395
        child_branch = branch.Branch.open(self.get_url('child'))
311
396
        self.assertEqual(repo.bzrdir.root_transport.base,
312
397
                         child_branch.repository.bzrdir.root_transport.base)
313
398
 
317
402
        self.failUnless(len(text))
318
403
 
319
404
    def test_get_commit_builder(self):
320
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
321
 
            bzrlib.repository.CommitBuilder)
 
405
        branch = self.make_branch(".")
 
406
        branch.lock_write()
 
407
        builder = branch.get_commit_builder([])
 
408
        self.assertIsInstance(builder, repository.CommitBuilder)
 
409
        branch.repository.commit_write_group()
 
410
        branch.unlock()
 
411
 
 
412
    def test_generate_revision_history(self):
 
413
        """Create a fake revision history easily."""
 
414
        tree = self.make_branch_and_tree('.')
 
415
        rev1 = tree.commit('foo')
 
416
        orig_history = tree.branch.revision_history()
 
417
        rev2 = tree.commit('bar', allow_pointless=True)
 
418
        tree.branch.generate_revision_history(rev1)
 
419
        self.assertEqual(orig_history, tree.branch.revision_history())
 
420
 
 
421
    def test_generate_revision_history_NULL_REVISION(self):
 
422
        tree = self.make_branch_and_tree('.')
 
423
        rev1 = tree.commit('foo')
 
424
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
 
425
        self.assertEqual([], tree.branch.revision_history())
 
426
 
 
427
    def test_create_checkout(self):
 
428
        tree_a = self.make_branch_and_tree('a')
 
429
        branch_a = tree_a.branch
 
430
        checkout_b = branch_a.create_checkout('b')
 
431
        self.assertEqual('null:', checkout_b.last_revision())
 
432
        checkout_b.commit('rev1', rev_id='rev1')
 
433
        self.assertEqual('rev1', branch_a.last_revision())
 
434
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
 
435
 
 
436
        checkout_c = branch_a.create_checkout('c', lightweight=True)
 
437
        self.assertEqual('rev1', checkout_c.last_revision())
 
438
        checkout_c.commit('rev2', rev_id='rev2')
 
439
        self.assertEqual('rev2', branch_a.last_revision())
 
440
        self.assertEqual(checkout_c.branch.base, branch_a.base)
 
441
 
 
442
        os.mkdir('d')
 
443
        checkout_d = branch_a.create_checkout('d', lightweight=True)
 
444
        self.assertEqual('rev2', checkout_d.last_revision())
 
445
        os.mkdir('e')
 
446
        checkout_e = branch_a.create_checkout('e')
 
447
        self.assertEqual('rev2', checkout_e.last_revision())
 
448
 
 
449
    def test_create_anonymous_lightweight_checkout(self):
 
450
        """A lightweight checkout from a readonly branch should succeed."""
 
451
        tree_a = self.make_branch_and_tree('a')
 
452
        rev_id = tree_a.commit('put some content in the branch')
 
453
        # open the branch via a readonly transport
 
454
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
 
455
        # sanity check that the test will be valid
 
456
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
 
457
            source_branch.lock_write)
 
458
        checkout = source_branch.create_checkout('c', lightweight=True)
 
459
        self.assertEqual(rev_id, checkout.last_revision())
 
460
 
 
461
    def test_create_anonymous_heavyweight_checkout(self):
 
462
        """A regular checkout from a readonly branch should succeed."""
 
463
        tree_a = self.make_branch_and_tree('a')
 
464
        rev_id = tree_a.commit('put some content in the branch')
 
465
        # open the branch via a readonly transport
 
466
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
 
467
        # sanity check that the test will be valid
 
468
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
 
469
            source_branch.lock_write)
 
470
        checkout = source_branch.create_checkout('c')
 
471
        self.assertEqual(rev_id, checkout.last_revision())
 
472
 
 
473
    def test_set_revision_history(self):
 
474
        tree = self.make_branch_and_tree('a')
 
475
        tree.commit('a commit', rev_id='rev1')
 
476
        br = tree.branch
 
477
        br.set_revision_history(["rev1"])
 
478
        self.assertEquals(br.revision_history(), ["rev1"])
 
479
        br.set_revision_history([])
 
480
        self.assertEquals(br.revision_history(), [])
322
481
 
323
482
 
324
483
class ChrootedTests(TestCaseWithBranch):
331
490
 
332
491
    def setUp(self):
333
492
        super(ChrootedTests, self).setUp()
334
 
        if not self.transport_server == MemoryServer:
 
493
        if not self.vfs_transport_factory == MemoryServer:
335
494
            self.transport_readonly_server = HttpServer
336
495
 
337
496
    def test_open_containing(self):
345
504
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
346
505
        self.assertEqual('g/p/q', relpath)
347
506
        
348
 
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
349
 
#         >>> from bzrlib.commit import commit
350
 
#         >>> bzrlib.trace.silent = True
351
 
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
352
 
#         >>> br1.working_tree().add('foo')
353
 
#         >>> br1.working_tree().add('bar')
354
 
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
355
 
#         >>> br2 = ScratchBranch()
356
 
#         >>> br2.update_revisions(br1)
357
 
#         Added 2 texts.
358
 
#         Added 1 inventories.
359
 
#         Added 1 revisions.
360
 
#         >>> br2.revision_history()
361
 
#         [u'REVISION-ID-1']
362
 
#         >>> br2.update_revisions(br1)
363
 
#         Added 0 revisions.
364
 
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
365
 
#         True
366
507
 
367
508
class InstrumentedTransaction(object):
368
509
 
427
568
        self.assertEqual(['lw', 'ul'], branch._calls)
428
569
 
429
570
 
430
 
class TestBranchTransaction(TestCaseWithBranch):
431
 
 
432
 
    def setUp(self):
433
 
        super(TestBranchTransaction, self).setUp()
434
 
        self.branch = None
435
 
        
436
 
    def test_default_get_transaction(self):
437
 
        """branch.get_transaction on a new branch should give a PassThrough."""
438
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
439
 
                                   transactions.PassThroughTransaction))
440
 
 
441
 
    def test__set_new_transaction(self):
442
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
443
 
 
444
 
    def test__set_over_existing_transaction_raises(self):
445
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
446
 
        self.assertRaises(errors.LockError,
447
 
                          self.get_branch()._set_transaction,
448
 
                          transactions.ReadOnlyTransaction())
449
 
 
450
 
    def test_finish_no_transaction_raises(self):
451
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
452
 
 
453
 
    def test_finish_readonly_transaction_works(self):
454
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
455
 
        self.get_branch()._finish_transaction()
456
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
457
 
 
458
 
    def test_unlock_calls_finish(self):
459
 
        self.get_branch().lock_read()
460
 
        transaction = InstrumentedTransaction()
461
 
        self.get_branch().control_files._transaction = transaction
462
 
        self.get_branch().unlock()
463
 
        self.assertEqual(['finish'], transaction.calls)
464
 
 
465
 
    def test_lock_read_acquires_ro_transaction(self):
466
 
        self.get_branch().lock_read()
467
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
468
 
                                   transactions.ReadOnlyTransaction))
469
 
        self.get_branch().unlock()
470
 
        
471
 
    def test_lock_write_acquires_write_transaction(self):
472
 
        self.get_branch().lock_write()
473
 
        # cannot use get_transaction as its magic
474
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
475
 
                                   transactions.WriteTransaction))
476
 
        self.get_branch().unlock()
477
 
 
478
 
 
479
571
class TestBranchPushLocations(TestCaseWithBranch):
480
572
 
481
573
    def test_get_push_location_unset(self):
482
574
        self.assertEqual(None, self.get_branch().get_push_location())
483
575
 
484
576
    def test_get_push_location_exact(self):
485
 
        from bzrlib.config import (branches_config_filename,
 
577
        from bzrlib.config import (locations_config_filename,
486
578
                                   ensure_config_dir_exists)
487
579
        ensure_config_dir_exists()
488
 
        fn = branches_config_filename()
489
 
        print >> open(fn, 'wt'), ("[%s]\n"
490
 
                                  "push_location=foo" %
491
 
                                  self.get_branch().base[:-1])
 
580
        fn = locations_config_filename()
 
581
        open(fn, 'wt').write(("[%s]\n"
 
582
                                  "push_location=foo\n" %
 
583
                                  self.get_branch().base[:-1]))
492
584
        self.assertEqual("foo", self.get_branch().get_push_location())
493
585
 
494
586
    def test_set_push_location(self):
495
 
        from bzrlib.config import (branches_config_filename,
496
 
                                   ensure_config_dir_exists)
497
 
        ensure_config_dir_exists()
498
 
        fn = branches_config_filename()
499
 
        self.get_branch().set_push_location('foo')
500
 
        self.assertFileEqual("[%s]\n"
501
 
                             "push_location = foo" % self.get_branch().base[:-1],
502
 
                             fn)
503
 
 
504
 
    # TODO RBC 20051029 test getting a push location from a branch in a 
505
 
    # recursive section - that is, it appends the branch name.
 
587
        branch = self.get_branch()
 
588
        branch.set_push_location('foo')
 
589
        self.assertEqual('foo', branch.get_push_location())
506
590
 
507
591
 
508
592
class TestFormat(TestCaseWithBranch):
509
593
    """Tests for the format itself."""
510
594
 
 
595
    def test_get_reference(self):
 
596
        """get_reference on all regular branches should return None."""
 
597
        if not self.branch_format.is_supported():
 
598
            # unsupported formats are not loopback testable
 
599
            # because the default open will not open them and
 
600
            # they may not be initializable.
 
601
            return
 
602
        made_branch = self.make_branch('.')
 
603
        self.assertEqual(None,
 
604
            made_branch._format.get_reference(made_branch.bzrdir))
 
605
 
 
606
    def test_set_reference(self):
 
607
        """set_reference on all regular branches should be callable."""
 
608
        if not self.branch_format.is_supported():
 
609
            # unsupported formats are not loopback testable
 
610
            # because the default open will not open them and
 
611
            # they may not be initializable.
 
612
            return
 
613
        this_branch = self.make_branch('this')
 
614
        other_branch = self.make_branch('other')
 
615
        try:
 
616
            this_branch._format.set_reference(this_branch.bzrdir, other_branch)
 
617
        except NotImplementedError:
 
618
            # that's ok
 
619
            pass
 
620
        else:
 
621
            ref = this_branch._format.get_reference(this_branch.bzrdir)
 
622
            self.assertEqual(ref, other_branch.base)
 
623
 
511
624
    def test_format_initialize_find_open(self):
512
625
        # loopback test to check the current format initializes to itself.
513
626
        if not self.branch_format.is_supported():
519
632
        t = get_transport(self.get_url())
520
633
        readonly_t = get_transport(self.get_readonly_url())
521
634
        made_branch = self.make_branch('.')
522
 
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
635
        self.failUnless(isinstance(made_branch, branch.Branch))
523
636
 
524
637
        # find it via bzrdir opening:
525
638
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
530
643
                        self.branch_format.__class__))
531
644
 
532
645
        # find it via Branch.open
533
 
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
 
646
        opened_branch = branch.Branch.open(readonly_t.base)
534
647
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
535
648
        self.assertEqual(made_branch._format.__class__,
536
649
                         opened_branch._format.__class__)
540
653
        except NotImplementedError:
541
654
            return
542
655
        self.assertEqual(self.branch_format,
543
 
                         bzrlib.branch.BranchFormat.find_format(opened_control))
 
656
                         opened_control.find_branch_format())
 
657
 
 
658
 
 
659
class TestBound(TestCaseWithBranch):
 
660
 
 
661
    def test_bind_unbind(self):
 
662
        branch = self.make_branch('1')
 
663
        branch2 = self.make_branch('2')
 
664
        try:
 
665
            branch.bind(branch2)
 
666
        except errors.UpgradeRequired:
 
667
            raise tests.TestNotApplicable('Format does not support binding')
 
668
        self.assertTrue(branch.unbind())
 
669
        self.assertFalse(branch.unbind())
 
670
        self.assertIs(None, branch.get_bound_location())
 
671
 
 
672
    def test_old_bound_location(self):
 
673
        branch = self.make_branch('branch1')
 
674
        try:
 
675
            self.assertIs(None, branch.get_old_bound_location())
 
676
        except errors.UpgradeRequired:
 
677
            raise tests.TestNotApplicable(
 
678
                    'Format does not store old bound locations')
 
679
        branch2 = self.make_branch('branch2')
 
680
        branch.bind(branch2)
 
681
        self.assertIs(None, branch.get_old_bound_location())
 
682
        branch.unbind()
 
683
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
 
684
 
 
685
    def test_bind_diverged(self):
 
686
        tree_a = self.make_branch_and_tree('tree_a')
 
687
        tree_a.commit('rev1a')
 
688
        tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
 
689
        tree_a.commit('rev2a')
 
690
        tree_b.commit('rev2b')
 
691
        try:
 
692
            tree_b.branch.bind(tree_a.branch)
 
693
        except errors.UpgradeRequired:
 
694
            raise tests.TestNotApplicable('Format does not support binding')
 
695
 
 
696
 
 
697
class TestStrict(TestCaseWithBranch):
 
698
 
 
699
    def test_strict_history(self):
 
700
        tree1 = self.make_branch_and_tree('tree1')
 
701
        try:
 
702
            tree1.branch.set_append_revisions_only(True)
 
703
        except errors.UpgradeRequired:
 
704
            raise TestSkipped('Format does not support strict history')
 
705
        tree1.commit('empty commit')
 
706
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
 
707
        tree2.commit('empty commit 2')
 
708
        tree1.pull(tree2.branch)
 
709
        tree1.commit('empty commit 3')
 
710
        tree2.commit('empty commit 4')
 
711
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
 
712
        tree2.merge_from_branch(tree1.branch)
 
713
        tree2.commit('empty commit 5')
 
714
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
 
715
                          tree2.branch)
 
716
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
 
717
        tree3.merge_from_branch(tree2.branch)
 
718
        tree3.commit('empty commit 6')
 
719
        tree2.pull(tree3.branch)