/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

[merge] robertc's integration, updated tests to check for retcode=3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2012, 2016 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
 
import contextlib
20
 
 
21
 
from breezy import (
22
 
    branch as _mod_branch,
23
 
    controldir,
24
 
    config,
25
 
    delta as _mod_delta,
26
 
    errors,
27
 
    lock,
28
 
    merge,
29
 
    osutils,
30
 
    urlutils,
31
 
    transform,
32
 
    transport,
33
 
    repository,
34
 
    revision,
35
 
    shelf,
36
 
    tests,
37
 
    tree as _mod_tree,
38
 
    )
39
 
from breezy.bzr import (
40
 
    branch as _mod_bzrbranch,
41
 
    remote,
42
 
    )
43
 
from breezy.sixish import (
44
 
    text_type,
45
 
    )
46
 
from breezy.tests import (
47
 
    per_branch,
48
 
    )
49
 
from breezy.tests.http_server import HttpServer
50
 
from breezy.transport import memory
51
 
 
52
 
 
53
 
class TestTestCaseWithBranch(per_branch.TestCaseWithBranch):
54
 
 
55
 
    def test_branch_format_matches_bzrdir_branch_format(self):
56
 
        bzrdir_branch_format = self.bzrdir_format.get_branch_format()
57
 
        self.assertIs(
58
 
            self.branch_format.__class__,
59
 
            bzrdir_branch_format.__class__)
60
 
 
61
 
    def test_make_branch_gets_expected_format(self):
62
 
        branch = self.make_branch('.')
63
 
        self.assertIs(
64
 
            self.branch_format.__class__,
65
 
            branch._format.__class__)
66
 
 
67
 
 
68
 
class TestBranch(per_branch.TestCaseWithBranch):
69
 
 
70
 
    def test_create_tree_with_merge(self):
71
 
        tree, revmap = self.create_tree_with_merge()
72
 
        tree.lock_read()
73
 
        self.addCleanup(tree.unlock)
74
 
        graph = tree.branch.repository.get_graph()
75
 
        ancestry_graph = graph.get_parent_map(
76
 
            tree.branch.repository.all_revision_ids())
77
 
        self.assertEqual({revmap['1']: (b'null:',),
78
 
                          revmap['2']: (revmap['1'], ),
79
 
                          revmap['1.1.1']: (revmap['1'], ),
80
 
                          revmap['3']: (revmap['2'], revmap['1.1.1'], ),
81
 
                          }, ancestry_graph)
82
 
 
83
 
    def test_revision_ids_are_utf8(self):
84
 
        wt = self.make_branch_and_tree('tree')
85
 
        rev1 = wt.commit('f')
86
 
        rev2 = wt.commit('f')
87
 
        rev3 = wt.commit('f')
88
 
 
89
 
        br = self.get_branch()
90
 
        br.fetch(wt.branch)
91
 
        br.generate_revision_history(rev3)
92
 
        for revision_id in [rev3, rev2, rev1]:
93
 
            self.assertIsInstance(revision_id, bytes)
94
 
        last = br.last_revision()
95
 
        self.assertEqual(rev3, last)
96
 
        self.assertIsInstance(last, bytes)
97
 
        revno, last = br.last_revision_info()
98
 
        self.assertEqual(3, revno)
99
 
        self.assertEqual(rev3, last)
100
 
        self.assertIsInstance(last, bytes)
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
from bzrlib.trace import mutter
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
 
30
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
32
# doesn't request any files that could have been avoided, by 
 
33
# hooking into the Transport.
 
34
 
 
35
class TestBranch(TestCaseInTempDir):
 
36
 
 
37
    def test_append_revisions(self):
 
38
        """Test appending more than one revision"""
 
39
        br = Branch.initialize(".")
 
40
        br.append_revision("rev1")
 
41
        self.assertEquals(br.revision_history(), ["rev1",])
 
42
        br.append_revision("rev2", "rev3")
 
43
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
101
44
 
102
45
    def test_fetch_revisions(self):
103
46
        """Test fetch-revision operation."""
104
 
        wt = self.make_branch_and_tree('b1')
105
 
        b1 = wt.branch
106
 
        self.build_tree_contents([('b1/foo', b'hello')])
107
 
        wt.add(['foo'])
108
 
        rev1 = wt.commit('lala!', allow_pointless=False)
109
 
 
110
 
        b2 = self.make_branch('b2')
111
 
        result = b2.fetch(b1)
112
 
        self.assertIsInstance(result, repository.FetchResult)
113
 
 
114
 
        rev = b2.repository.get_revision(rev1)
115
 
        tree = b2.repository.revision_tree(rev1)
116
 
        tree.lock_read()
117
 
        self.addCleanup(tree.unlock)
118
 
        self.assertEqual(tree.get_file_text('foo'), b'hello')
119
 
 
120
 
    def get_unbalanced_tree_pair(self):
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.add(['foo'], ['foo-id'])
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
55
 
 
56
        mutter('start fetch')
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
121
76
        """Return two branches, a and b, with one file in a."""
122
 
        tree_a = self.make_branch_and_tree('a')
123
 
        self.build_tree_contents([('a/b', b'b')])
124
 
        tree_a.add('b')
125
 
        tree_a.commit("silly commit")
126
 
 
127
 
        tree_b = self.make_branch_and_tree('b')
128
 
        return tree_a, tree_b
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
 
79
        file('a/b', 'wb').write('b')
 
80
        br_a.add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
129
85
 
130
86
    def get_balanced_branch_pair(self):
131
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
132
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
133
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
134
 
        return tree_a, tree_b
135
 
 
136
 
    def test_clone_partial(self):
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
 
111
        os.mkdir('c')
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
114
 
 
115
    def test_copy_partial(self):
137
116
        """Copy only part of the history of a branch."""
138
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
139
 
        #       what should that behaviour be ? Emailed the list.
140
 
        # First, make a branch with two commits.
141
 
        wt_a = self.make_branch_and_tree('a')
142
 
        self.build_tree(['a/one'])
143
 
        wt_a.add(['one'])
144
 
        rev1 = wt_a.commit('commit one')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.add(['one'])
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
145
121
        self.build_tree(['a/two'])
146
 
        wt_a.add(['two'])
147
 
        wt_a.commit('commit two')
148
 
        # Now make a copy of the repository.
149
 
        repo_b = self.make_repository('b')
150
 
        wt_a.branch.repository.copy_content_into(repo_b)
151
 
        # wt_a might be a lightweight checkout, so get a hold of the actual
152
 
        # branch (because you can't do a partial clone of a lightweight
153
 
        # checkout).
154
 
        branch = wt_a.branch.controldir.open_branch()
155
 
        # Then make a branch where the new repository is, but specify a revision
156
 
        # ID.  The new branch's history will stop at the specified revision.
157
 
        br_b = branch.clone(repo_b.controldir, revision_id=rev1)
158
 
        self.assertEqual(rev1, br_b.last_revision())
159
 
 
160
 
    def get_parented_branch(self):
161
 
        wt_a = self.make_branch_and_tree('a')
162
 
        self.build_tree(['a/one'])
163
 
        wt_a.add(['one'])
164
 
        rev1 = wt_a.commit('commit one')
165
 
 
166
 
        branch_b = wt_a.branch.controldir.sprout(
167
 
            'b', revision_id=rev1).open_branch()
168
 
        self.assertEqual(
169
 
            urlutils.strip_segment_parameters(wt_a.branch.user_url),
170
 
            urlutils.strip_segment_parameters(branch_b.get_parent()))
171
 
        return branch_b
172
 
 
173
 
    def test_clone_branch_nickname(self):
174
 
        # test the nick name is preserved always
175
 
        raise tests.TestSkipped('XXX branch cloning is not yet tested.')
176
 
 
177
 
    def test_clone_branch_parent(self):
178
 
        # test the parent is preserved always
179
 
        branch_b = self.get_parented_branch()
180
 
        repo_c = self.make_repository('c')
181
 
        branch_b.repository.copy_content_into(repo_c)
182
 
        branch_c = branch_b.clone(repo_c.controldir)
183
 
        self.assertNotEqual(None, branch_c.get_parent())
184
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
185
 
 
186
 
        # We can also set a specific parent, and it should be honored
187
 
        random_parent = 'http://example.com/path/to/branch'
188
 
        branch_b.set_parent(random_parent)
189
 
        repo_d = self.make_repository('d')
190
 
        branch_b.repository.copy_content_into(repo_d)
191
 
        branch_d = branch_b.clone(repo_d.controldir)
192
 
        self.assertEqual(random_parent, branch_d.get_parent())
193
 
 
194
 
    def test_submit_branch(self):
195
 
        """Submit location can be queried and set"""
196
 
        branch = self.make_branch('branch')
197
 
        self.assertEqual(branch.get_submit_branch(), None)
198
 
        branch.set_submit_branch('sftp://example.com')
199
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
200
 
        branch.set_submit_branch('sftp://example.net')
201
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
202
 
 
203
 
    def test_public_branch(self):
204
 
        """public location can be queried and set"""
205
 
        branch = self.make_branch('branch')
206
 
        self.assertEqual(branch.get_public_branch(), None)
207
 
        branch.set_public_branch('sftp://example.com')
208
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
209
 
        branch.set_public_branch('sftp://example.net')
210
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
211
 
        branch.set_public_branch(None)
212
 
        self.assertEqual(branch.get_public_branch(), None)
213
 
 
214
 
    def test_record_initial_ghost(self):
215
 
        """Branches should support having ghosts."""
216
 
        wt = self.make_branch_and_tree('.')
217
 
        if not wt.branch.repository._format.supports_ghosts:
218
 
            raise tests.TestNotApplicable("repository format does not "
219
 
                                          "support ghosts")
220
 
        wt.set_parent_ids([b'non:existent@rev--ision--0--2'],
221
 
                          allow_leftmost_as_ghost=True)
222
 
        self.assertEqual([b'non:existent@rev--ision--0--2'],
223
 
                         wt.get_parent_ids())
224
 
        rev_id = wt.commit('commit against a ghost first parent.')
225
 
        rev = wt.branch.repository.get_revision(rev_id)
226
 
        self.assertEqual(rev.parent_ids, [b'non:existent@rev--ision--0--2'])
 
122
        br_a.add(['two'])
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
 
128
        
 
129
    def test_record_initial_ghost_merge(self):
 
130
        """A pending merge with no revision present is still a merge."""
 
131
        branch = Branch.initialize('.')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
 
135
        self.assertEqual(len(rev.parent_ids), 1)
227
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
228
137
        self.assertEqual(len(rev.parent_sha1s), 0)
229
 
 
230
 
    def test_record_two_ghosts(self):
231
 
        """Recording with all ghosts works."""
232
 
        wt = self.make_branch_and_tree('.')
233
 
        if not wt.branch.repository._format.supports_ghosts:
234
 
            raise tests.TestNotApplicable("repository format does not "
235
 
                                          "support ghosts")
236
 
        wt.set_parent_ids([
237
 
            b'foo@azkhazan-123123-abcabc',
238
 
            b'wibble@fofof--20050401--1928390812',
239
 
            ],
240
 
            allow_leftmost_as_ghost=True)
241
 
        rev_id = wt.commit("commit from ghost base with one merge")
242
 
        # the revision should have been committed with two parents
243
 
        rev = wt.branch.repository.get_revision(rev_id)
244
 
        self.assertEqual([b'foo@azkhazan-123123-abcabc',
245
 
                          b'wibble@fofof--20050401--1928390812'],
246
 
                         rev.parent_ids)
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
247
139
 
248
140
    def test_bad_revision(self):
249
 
        self.assertRaises(errors.InvalidRevisionId,
250
 
                          self.get_branch().repository.get_revision,
251
 
                          None)
252
 
 
253
 
    def test_nicks_bzr(self):
254
 
        """Test the behaviour of branch nicks specific to bzr branches.
255
 
 
256
 
        Nicknames are implicitly the name of the branch's directory, unless an
257
 
        explicit nickname is set.  That is, an explicit nickname always
258
 
        overrides the implicit one.
259
 
 
260
 
        """
261
 
        t = self.get_transport()
262
 
        branch = self.make_branch('bzr.dev')
263
 
        if not isinstance(branch, _mod_bzrbranch.BzrBranch):
264
 
            raise tests.TestNotApplicable("not a bzr branch format")
265
 
        # The nick will be 'bzr.dev', because there is no explicit nick set.
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
143
 
 
144
# TODO 20051003 RBC:
 
145
# compare the gpg-to-sign info for a commit with a ghost and 
 
146
#     an identical tree without a ghost
 
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
148
        
 
149
    def test_pending_merges(self):
 
150
        """Tracking pending-merged revisions."""
 
151
        b = Branch.initialize('.')
 
152
        wt = b.working_tree()
 
153
        self.assertEquals(wt.pending_merges(), [])
 
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(wt.pending_merges(),
 
160
                          ['foo@azkhazan-123123-abcabc',
 
161
                           'wibble@fofof--20050401--1928390812'])
 
162
        b.working_tree().commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
165
        self.assertEquals(rev.parent_ids[0],
 
166
                          'foo@azkhazan-123123-abcabc')
 
167
        self.assertEquals(rev.parent_ids[1],
 
168
                           'wibble@fofof--20050401--1928390812')
 
169
        # list should be cleared when we do a commit
 
170
        self.assertEquals(wt.pending_merges(), [])
 
171
 
 
172
    def test_sign_existing_revision(self):
 
173
        branch = Branch.initialize('.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
175
        from bzrlib.testament import Testament
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
 
179
 
 
180
    def test_store_signature(self):
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
193
 
 
194
    def test_nicks(self):
 
195
        """Branch nicknames"""
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
266
198
        self.assertEqual(branch.nick, 'bzr.dev')
267
 
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
268
 
        # will report its nick as 'bzr.ab'.
269
 
        t.move('bzr.dev', 'bzr.ab')
270
 
        branch = _mod_branch.Branch.open(self.get_url('bzr.ab'))
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
271
201
        self.assertEqual(branch.nick, 'bzr.ab')
272
 
        # Set the branch nick explicitly.  This will ensure there's a branch
273
 
        # config file in the branch.
274
 
        branch.nick = "Aaron's branch"
275
 
        if not isinstance(branch, remote.RemoteBranch):
276
 
            self.assertTrue(branch._transport.has("branch.conf"))
277
 
        # Because the nick has been set explicitly, the nick is now always
278
 
        # "Aaron's branch", regardless of directory name.
279
 
        self.assertEqual(branch.nick, "Aaron's branch")
280
 
        t.move('bzr.ab', 'integration')
281
 
        branch = _mod_branch.Branch.open(self.get_url('integration'))
282
 
        self.assertEqual(branch.nick, "Aaron's branch")
283
 
        branch.nick = u"\u1234"
284
 
        self.assertEqual(branch.nick, u"\u1234")
285
 
 
286
 
    def test_nicks(self):
287
 
        """Test explicit and implicit branch nicknames.
288
 
 
289
 
        A nickname is always available, whether set explicitly or not.
290
 
        """
291
 
        t = self.get_transport()
292
 
        branch = self.make_branch('bzr.dev')
293
 
        # An implicit nick name is set; what it is exactly depends on the
294
 
        # format.
295
 
        self.assertIsInstance(branch.nick, text_type)
296
 
        # Set the branch nick explicitly.
297
 
        branch.nick = u"Aaron's branch"
298
 
        # Because the nick has been set explicitly, the nick is now always
299
 
        # "Aaron's branch".
300
 
        self.assertEqual(branch.nick, u"Aaron's branch")
 
202
        branch.nick = "Aaron's branch"
 
203
        branch.nick = "Aaron's branch"
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
205
        self.assertEqual(branch.nick, "Aaron's branch")
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
 
208
        self.assertEqual(branch.nick, "Aaron's branch")
301
209
        branch.nick = u"\u1234"
302
210
        self.assertEqual(branch.nick, u"\u1234")
303
211
 
304
212
    def test_commit_nicks(self):
305
213
        """Nicknames are committed to the revision"""
306
 
        wt = self.make_branch_and_tree('bzr.dev')
307
 
        branch = wt.branch
 
214
        os.mkdir('bzr.dev')
 
215
        branch = Branch.initialize('bzr.dev')
308
216
        branch.nick = "My happy branch"
309
 
        wt.commit('My commit respect da nick.')
310
 
        committed = branch.repository.get_revision(branch.last_revision())
311
 
        if branch.repository._format.supports_storing_branch_nick:
312
 
            self.assertEqual(committed.properties["branch-nick"],
313
 
                             "My happy branch")
314
 
        else:
315
 
            self.assertNotIn("branch-nick", committed.properties)
316
 
 
317
 
    def test_create_colocated(self):
318
 
        try:
319
 
            repo = self.make_repository('.', shared=True)
320
 
        except errors.IncompatibleFormat:
321
 
            return
322
 
        if repo.controldir._format.colocated_branches:
323
 
            raise tests.TestNotApplicable(
324
 
                "control dir does not support colocated branches")
325
 
        self.assertEqual(0, len(repo.controldir.list_branches()))
326
 
        if not self.bzrdir_format.colocated_branches:
327
 
            raise tests.TestNotApplicable("control dir format does not support "
328
 
                                          "colocated branches")
329
 
        try:
330
 
            child_branch1 = self.branch_format.initialize(repo.controldir,
331
 
                                                          name='branch1')
332
 
        except errors.UninitializableFormat:
333
 
            # branch references are not default init'able and
334
 
            # not all bzrdirs support colocated branches.
335
 
            return
336
 
        self.assertEqual(1, len(repo.controldir.list_branches()))
337
 
        self.branch_format.initialize(repo.controldir, name='branch2')
338
 
        self.assertEqual(2, len(repo.controldir.list_branches()))
339
 
 
340
 
    def test_create_append_revisions_only(self):
341
 
        try:
342
 
            repo = self.make_repository('.', shared=True)
343
 
        except errors.IncompatibleFormat:
344
 
            return
345
 
        for val in (True, False):
346
 
            try:
347
 
                branch = self.branch_format.initialize(repo.controldir,
348
 
                                                       append_revisions_only=True)
349
 
            except (errors.UninitializableFormat, errors.UpgradeRequired):
350
 
                # branch references are not default init'able and
351
 
                # not all branches support append_revisions_only
352
 
                return
353
 
            self.assertEqual(True, branch.get_append_revisions_only())
354
 
            repo.controldir.destroy_branch()
355
 
 
356
 
    def test_get_set_append_revisions_only(self):
357
 
        branch = self.make_branch('.')
358
 
        if branch._format.supports_set_append_revisions_only():
359
 
            branch.set_append_revisions_only(True)
360
 
            self.assertTrue(branch.get_append_revisions_only())
361
 
            branch.set_append_revisions_only(False)
362
 
            self.assertFalse(branch.get_append_revisions_only())
363
 
        else:
364
 
            self.assertRaises(errors.UpgradeRequired,
365
 
                              branch.set_append_revisions_only, True)
366
 
            self.assertFalse(branch.get_append_revisions_only())
367
 
 
368
 
    def test_create_open_branch_uses_repository(self):
369
 
        try:
370
 
            repo = self.make_repository('.', shared=True)
371
 
        except errors.IncompatibleFormat:
372
 
            raise tests.TestNotApplicable("requires shared repository support")
373
 
        child_transport = repo.controldir.root_transport.clone('child')
374
 
        child_transport.mkdir('.')
375
 
        try:
376
 
            child_dir = self.bzrdir_format.initialize_on_transport(
377
 
                child_transport)
378
 
        except errors.UninitializableFormat:
379
 
            raise tests.TestNotApplicable(
380
 
                "control dir format not initializable")
381
 
        try:
382
 
            child_branch = self.branch_format.initialize(child_dir)
383
 
        except errors.UninitializableFormat:
384
 
            # branch references are not default init'able.
385
 
            return
386
 
        self.assertEqual(repo.controldir.root_transport.base,
387
 
                         child_branch.repository.controldir.root_transport.base)
388
 
        child_branch = _mod_branch.Branch.open(self.get_url('child'))
389
 
        self.assertEqual(repo.controldir.root_transport.base,
390
 
                         child_branch.repository.controldir.root_transport.base)
391
 
 
392
 
    def test_format_description(self):
393
 
        tree = self.make_branch_and_tree('tree')
394
 
        text = tree.branch._format.get_format_description()
395
 
        self.assertTrue(len(text))
396
 
 
397
 
    def test_get_commit_builder(self):
398
 
        branch = self.make_branch(".")
399
 
        branch.lock_write()
400
 
        builder = branch.get_commit_builder([])
401
 
        self.assertIsInstance(builder, repository.CommitBuilder)
402
 
        branch.repository.commit_write_group()
403
 
        branch.unlock()
404
 
 
405
 
    def test_generate_revision_history(self):
406
 
        """Create a fake revision history easily."""
407
 
        tree = self.make_branch_and_tree('.')
408
 
        rev1 = tree.commit('foo')
409
 
        tree.lock_write()
410
 
        self.addCleanup(tree.unlock)
411
 
        graph = tree.branch.repository.get_graph()
412
 
        orig_history = list(
413
 
            graph.iter_lefthand_ancestry(
414
 
                tree.branch.last_revision(), [revision.NULL_REVISION]))
415
 
        rev2 = tree.commit('bar', allow_pointless=True)
416
 
        tree.branch.generate_revision_history(rev1)
417
 
        self.assertEqual(orig_history, list(
418
 
            graph.iter_lefthand_ancestry(
419
 
                tree.branch.last_revision(), [revision.NULL_REVISION])))
420
 
 
421
 
    def test_generate_revision_history_NULL_REVISION(self):
422
 
        tree = self.make_branch_and_tree('.')
423
 
        rev1 = tree.commit('foo')
424
 
        tree.lock_write()
425
 
        self.addCleanup(tree.unlock)
426
 
        tree.branch.generate_revision_history(revision.NULL_REVISION)
427
 
        self.assertEqual(revision.NULL_REVISION, tree.branch.last_revision())
428
 
 
429
 
    def test_create_checkout(self):
430
 
        tree_a = self.make_branch_and_tree('a')
431
 
        branch_a = tree_a.branch
432
 
        checkout_b = branch_a.create_checkout('b')
433
 
        self.assertEqual(b'null:', checkout_b.last_revision())
434
 
        try:
435
 
            rev1 = checkout_b.commit('rev1')
436
 
        except errors.NoRoundtrippingSupport:
437
 
            raise tests.TestNotApplicable(
438
 
                'roundtripping between %r and %r not supported' %
439
 
                (checkout_b.branch, checkout_b.branch.get_master_branch()))
440
 
        self.assertEqual(rev1, branch_a.last_revision())
441
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
442
 
 
443
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
444
 
        self.assertEqual(rev1, checkout_c.last_revision())
445
 
        rev2 = checkout_c.commit('rev2')
446
 
        self.assertEqual(rev2, branch_a.last_revision())
447
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
448
 
 
449
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
450
 
        self.assertEqual(rev2, checkout_d.last_revision())
451
 
        checkout_e = branch_a.create_checkout('e')
452
 
        self.assertEqual(rev2, checkout_e.last_revision())
453
 
 
454
 
    def test_create_anonymous_lightweight_checkout(self):
455
 
        """A lightweight checkout from a readonly branch should succeed."""
456
 
        tree_a = self.make_branch_and_tree('a')
457
 
        rev_id = tree_a.commit('put some content in the branch')
458
 
        # open the branch via a readonly transport
459
 
        url = self.get_readonly_url(urlutils.basename(tree_a.branch.base))
460
 
        t = transport.get_transport_from_url(url)
461
 
        if not tree_a.branch.controldir._format.supports_transport(t):
462
 
            raise tests.TestNotApplicable("format does not support transport")
463
 
        source_branch = _mod_branch.Branch.open(url)
464
 
        # sanity check that the test will be valid
465
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
466
 
                          source_branch.lock_write)
467
 
        checkout = source_branch.create_checkout('c', lightweight=True)
468
 
        self.assertEqual(rev_id, checkout.last_revision())
469
 
 
470
 
    def test_create_anonymous_heavyweight_checkout(self):
471
 
        """A regular checkout from a readonly branch should succeed."""
472
 
        tree_a = self.make_branch_and_tree('a')
473
 
        rev_id = tree_a.commit('put some content in the branch')
474
 
        # open the branch via a readonly transport
475
 
        url = self.get_readonly_url(
476
 
            osutils.basename(tree_a.branch.base.rstrip('/')))
477
 
        t = transport.get_transport_from_url(url)
478
 
        if not tree_a.branch.controldir._format.supports_transport(t):
479
 
            raise tests.TestNotApplicable("format does not support transport")
480
 
        source_branch = _mod_branch.Branch.open(url)
481
 
        # sanity check that the test will be valid
482
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
483
 
                          source_branch.lock_write)
484
 
        checkout = source_branch.create_checkout('c')
485
 
        self.assertEqual(rev_id, checkout.last_revision())
486
 
 
487
 
    def test_heads_to_fetch(self):
488
 
        # heads_to_fetch is a method that returns a collection of revids that
489
 
        # need to be fetched to copy this branch into another repo.  At a
490
 
        # minimum this will include the tip.
491
 
        # (In native formats, this is the tip + tags, but other formats may
492
 
        # have other revs needed)
493
 
        tree = self.make_branch_and_tree('a')
494
 
        tree.commit('first commit')
495
 
        rev2 = tree.commit('second commit')
496
 
        must_fetch, should_fetch = tree.branch.heads_to_fetch()
497
 
        self.assertTrue(rev2 in must_fetch)
498
 
 
499
 
    def test_heads_to_fetch_not_null_revision(self):
500
 
        # NULL_REVISION does not appear in the result of heads_to_fetch, even
501
 
        # for an empty branch.
502
 
        tree = self.make_branch_and_tree('a')
503
 
        must_fetch, should_fetch = tree.branch.heads_to_fetch()
504
 
        self.assertFalse(revision.NULL_REVISION in must_fetch)
505
 
        self.assertFalse(revision.NULL_REVISION in should_fetch)
506
 
 
507
 
    def test_create_memorytree(self):
508
 
        tree = self.make_branch_and_tree('a')
509
 
        self.assertIsInstance(tree.branch.create_memorytree(), _mod_tree.Tree)
510
 
 
511
 
 
512
 
class TestBranchFormat(per_branch.TestCaseWithBranch):
513
 
 
514
 
    def test_branch_format_network_name(self):
515
 
        br = self.make_branch('.')
516
 
        format = br._format
517
 
        network_name = format.network_name()
518
 
        self.assertIsInstance(network_name, bytes)
519
 
        # We want to test that the network_name matches the actual format on
520
 
        # disk. For local branches that means that using network_name as a key
521
 
        # in the registry gives back the same format. For remote branches we
522
 
        # check that the network_name of the RemoteBranchFormat we have locally
523
 
        # matches the actual format present on disk.
524
 
        if isinstance(format, remote.RemoteBranchFormat):
525
 
            br._ensure_real()
526
 
            real_branch = br._real_branch
527
 
            self.assertEqual(real_branch._format.network_name(), network_name)
528
 
        else:
529
 
            registry = _mod_branch.network_format_registry
530
 
            looked_up_format = registry.get(network_name)
531
 
            self.assertEqual(format.__class__, looked_up_format.__class__)
532
 
 
533
 
    def test_get_config_calls(self):
534
 
        # Smoke test that all branch succeed getting a config
535
 
        br = self.make_branch('.')
536
 
        br.get_config()
537
 
        br.get_config_stack()
538
 
 
539
 
 
540
 
class ChrootedTests(per_branch.TestCaseWithBranch):
541
 
    """A support class that provides readonly urls outside the local namespace.
542
 
 
543
 
    This is done by checking if self.transport_server is a MemoryServer. if it
544
 
    is then we are chrooted already, if it is not then an HttpServer is used
545
 
    for readonly urls.
546
 
    """
547
 
 
548
 
    def setUp(self):
549
 
        super(ChrootedTests, self).setUp()
550
 
        if not self.vfs_transport_factory == memory.MemoryServer:
551
 
            self.transport_readonly_server = HttpServer
 
217
        branch.working_tree().commit('My commit respect da nick.')
 
218
        committed = branch.get_revision(branch.last_revision())
 
219
        self.assertEqual(committed.properties["branch-nick"], 
 
220
                         "My happy branch")
 
221
 
 
222
 
 
223
class TestRemote(TestCaseWithWebserver):
552
224
 
553
225
    def test_open_containing(self):
554
 
        self.assertRaises(errors.NotBranchError,
555
 
                          _mod_branch.Branch.open_containing,
556
 
                          self.get_readonly_url(''))
557
 
        self.assertRaises(errors.NotBranchError,
558
 
                          _mod_branch.Branch.open_containing,
559
 
                          self.get_readonly_url('g/p/q'))
560
 
        branch = self.make_branch('.')
561
 
        if not branch.controldir._format.supports_transport(
562
 
                transport.get_transport_from_url(self.get_readonly_url('.'))):
563
 
            raise tests.TestNotApplicable("format does not support transport")
564
 
        branch, relpath = _mod_branch.Branch.open_containing(
565
 
            self.get_readonly_url(''))
 
226
        self.assertRaises(NotBranchError, Branch.open_containing,
 
227
                          self.get_remote_url(''))
 
228
        self.assertRaises(NotBranchError, Branch.open_containing,
 
229
                          self.get_remote_url('g/p/q'))
 
230
        b = Branch.initialize('.')
 
231
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
566
232
        self.assertEqual('', relpath)
567
 
        branch, relpath = _mod_branch.Branch.open_containing(
568
 
            self.get_readonly_url('g/p/q'))
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
569
234
        self.assertEqual('g/p/q', relpath)
570
 
 
 
235
        
 
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
237
#         >>> from bzrlib.commit import commit
 
238
#         >>> bzrlib.trace.silent = True
 
239
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
240
#         >>> br1.add('foo')
 
241
#         >>> br1.add('bar')
 
242
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
243
#         >>> br2 = ScratchBranch()
 
244
#         >>> br2.update_revisions(br1)
 
245
#         Added 2 texts.
 
246
#         Added 1 inventories.
 
247
#         Added 1 revisions.
 
248
#         >>> br2.revision_history()
 
249
#         [u'REVISION-ID-1']
 
250
#         >>> br2.update_revisions(br1)
 
251
#         Added 0 revisions.
 
252
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
253
#         True
571
254
 
572
255
class InstrumentedTransaction(object):
573
256
 
585
268
 
586
269
    def lock_read(self):
587
270
        self._calls.append('lr')
588
 
        return lock.LogicalLockResult(self.unlock)
589
271
 
590
272
    def lock_write(self):
591
273
        self._calls.append('lw')
592
 
        return lock.LogicalLockResult(self.unlock)
593
274
 
594
275
    def unlock(self):
595
276
        self._calls.append('ul')
596
277
 
 
278
    @needs_read_lock
597
279
    def do_with_read(self):
598
 
        with self.lock_read():
599
 
            return 1
 
280
        return 1
600
281
 
 
282
    @needs_read_lock
601
283
    def except_with_read(self):
602
 
        with self.lock_read():
603
 
            raise RuntimeError
604
 
 
605
 
 
606
 
class TestBranchPushLocations(per_branch.TestCaseWithBranch):
607
 
 
 
284
        raise RuntimeError
 
285
 
 
286
    @needs_write_lock
 
287
    def do_with_write(self):
 
288
        return 2
 
289
 
 
290
    @needs_write_lock
 
291
    def except_with_write(self):
 
292
        raise RuntimeError
 
293
 
 
294
 
 
295
class TestDecorators(TestCase):
 
296
 
 
297
    def test_needs_read_lock(self):
 
298
        branch = TestDecorator()
 
299
        self.assertEqual(1, branch.do_with_read())
 
300
        self.assertEqual(['lr', 'ul'], branch._calls)
 
301
 
 
302
    def test_excepts_in_read_lock(self):
 
303
        branch = TestDecorator()
 
304
        self.assertRaises(RuntimeError, branch.except_with_read)
 
305
        self.assertEqual(['lr', 'ul'], branch._calls)
 
306
 
 
307
    def test_needs_write_lock(self):
 
308
        branch = TestDecorator()
 
309
        self.assertEqual(2, branch.do_with_write())
 
310
        self.assertEqual(['lw', 'ul'], branch._calls)
 
311
 
 
312
    def test_excepts_in_write_lock(self):
 
313
        branch = TestDecorator()
 
314
        self.assertRaises(RuntimeError, branch.except_with_write)
 
315
        self.assertEqual(['lw', 'ul'], branch._calls)
 
316
 
 
317
 
 
318
class TestBranchTransaction(TestCaseInTempDir):
 
319
 
 
320
    def setUp(self):
 
321
        super(TestBranchTransaction, self).setUp()
 
322
        self.branch = Branch.initialize('.')
 
323
        
 
324
    def test_default_get_transaction(self):
 
325
        """branch.get_transaction on a new branch should give a PassThrough."""
 
326
        self.failUnless(isinstance(self.branch.get_transaction(),
 
327
                                   transactions.PassThroughTransaction))
 
328
 
 
329
    def test__set_new_transaction(self):
 
330
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
331
 
 
332
    def test__set_over_existing_transaction_raises(self):
 
333
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
334
        self.assertRaises(errors.LockError,
 
335
                          self.branch._set_transaction,
 
336
                          transactions.ReadOnlyTransaction())
 
337
 
 
338
    def test_finish_no_transaction_raises(self):
 
339
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
340
 
 
341
    def test_finish_readonly_transaction_works(self):
 
342
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
343
        self.branch._finish_transaction()
 
344
        self.assertEqual(None, self.branch._transaction)
 
345
 
 
346
    def test_unlock_calls_finish(self):
 
347
        self.branch.lock_read()
 
348
        transaction = InstrumentedTransaction()
 
349
        self.branch._transaction = transaction
 
350
        self.branch.unlock()
 
351
        self.assertEqual(['finish'], transaction.calls)
 
352
 
 
353
    def test_lock_read_acquires_ro_transaction(self):
 
354
        self.branch.lock_read()
 
355
        self.failUnless(isinstance(self.branch.get_transaction(),
 
356
                                   transactions.ReadOnlyTransaction))
 
357
        self.branch.unlock()
 
358
        
 
359
    def test_lock_write_acquires_passthrough_transaction(self):
 
360
        self.branch.lock_write()
 
361
        # cannot use get_transaction as its magic
 
362
        self.failUnless(isinstance(self.branch._transaction,
 
363
                                   transactions.PassThroughTransaction))
 
364
        self.branch.unlock()
 
365
 
 
366
 
 
367
class TestBranchPushLocations(TestCaseInTempDir):
 
368
 
 
369
    def setUp(self):
 
370
        super(TestBranchPushLocations, self).setUp()
 
371
        self.branch = Branch.initialize('.')
 
372
        
608
373
    def test_get_push_location_unset(self):
609
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
374
        self.assertEqual(None, self.branch.get_push_location())
610
375
 
611
376
    def test_get_push_location_exact(self):
612
 
        b = self.get_branch()
613
 
        config.LocationConfig.from_string(
614
 
            '[%s]\npush_location=foo\n' % (b.base,), b.base, save=True)
615
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
377
        self.build_tree(['.bazaar/'])
 
378
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
379
                                                       "push_location=foo" %
 
380
                                                       os.getcwdu())
 
381
        self.assertEqual("foo", self.branch.get_push_location())
616
382
 
617
383
    def test_set_push_location(self):
618
 
        branch = self.get_branch()
619
 
        branch.set_push_location('foo')
620
 
        self.assertEqual('foo', branch.get_push_location())
621
 
 
622
 
 
623
 
class TestChildSubmitFormats(per_branch.TestCaseWithBranch):
624
 
 
625
 
    def test_get_child_submit_format_default(self):
626
 
        submit_format = self.get_branch().get_child_submit_format()
627
 
        self.assertTrue(submit_format is None or
628
 
                        isinstance(submit_format, str))
629
 
 
630
 
    def test_get_child_submit_format(self):
631
 
        branch = self.get_branch()
632
 
        branch.get_config_stack().set('child_submit_format', '10')
633
 
        branch = self.get_branch()
634
 
        self.assertEqual('10', branch.get_child_submit_format())
635
 
 
636
 
 
637
 
class TestFormat(per_branch.TestCaseWithBranch):
638
 
    """Tests for the format itself."""
639
 
 
640
 
    def test_get_reference(self):
641
 
        """get_reference on all regular branches should return None."""
642
 
        if not self.branch_format.is_supported():
643
 
            # unsupported formats are not loopback testable
644
 
            # because the default open will not open them and
645
 
            # they may not be initializable.
646
 
            return
647
 
        made_controldir = self.make_controldir('.')
648
 
        made_controldir.create_repository()
649
 
        if made_controldir._format.colocated_branches:
650
 
            # Formats that support colocated branches sometimes have a default
651
 
            # branch that is a reference branch (such as Git). Cope with
652
 
            # those by creating a different colocated branch.
653
 
            name = 'foo'
654
 
        else:
655
 
            name = None
656
 
        try:
657
 
            made_branch = made_controldir.create_branch(name)
658
 
        except errors.UninitializableFormat:
659
 
            raise tests.TestNotApplicable('Uninitializable branch format')
660
 
 
661
 
        self.assertEqual(None,
662
 
                         made_branch._format.get_reference(made_branch.controldir, name))
663
 
 
664
 
    def test_set_reference(self):
665
 
        """set_reference on all regular branches should be callable."""
666
 
        if not self.branch_format.is_supported():
667
 
            # unsupported formats are not loopback testable
668
 
            # because the default open will not open them and
669
 
            # they may not be initializable.
670
 
            return
671
 
        this_branch = self.make_branch('this')
672
 
        other_branch = self.make_branch('other')
673
 
        try:
674
 
            this_branch._format.set_reference(this_branch.controldir, None,
675
 
                                              other_branch)
676
 
        except (NotImplementedError, errors.IncompatibleFormat):
677
 
            # that's ok
678
 
            pass
679
 
        else:
680
 
            ref = this_branch._format.get_reference(this_branch.controldir)
681
 
            self.assertEqual(ref, other_branch.user_url)
682
 
 
683
 
    def test_format_initialize_find_open(self):
684
 
        # loopback test to check the current format initializes to itself.
685
 
        if not self.branch_format.is_supported():
686
 
            # unsupported formats are not loopback testable
687
 
            # because the default open will not open them and
688
 
            # they may not be initializable.
689
 
            return
690
 
        # supported formats must be able to init and open
691
 
        t = self.get_transport()
692
 
        readonly_t = transport.get_transport_from_url(self.get_readonly_url())
693
 
        made_branch = self.make_branch('.')
694
 
        self.assertIsInstance(made_branch, _mod_branch.Branch)
695
 
 
696
 
        # find it via bzrdir opening:
697
 
        opened_control = controldir.ControlDir.open(readonly_t.base)
698
 
        direct_opened_branch = opened_control.open_branch()
699
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
700
 
        self.assertEqual(opened_control, direct_opened_branch.controldir)
701
 
        self.assertIsInstance(direct_opened_branch._format,
702
 
                              self.branch_format.__class__)
703
 
 
704
 
        # find it via Branch.open
705
 
        opened_branch = _mod_branch.Branch.open(readonly_t.base)
706
 
        self.assertIsInstance(opened_branch, made_branch.__class__)
707
 
        self.assertEqual(made_branch._format.__class__,
708
 
                         opened_branch._format.__class__)
709
 
        # if it has a unique id string, can we probe for it ?
710
 
        try:
711
 
            self.branch_format.get_format_string()
712
 
        except NotImplementedError:
713
 
            return
714
 
        self.assertEqual(self.branch_format,
715
 
                         opened_control.find_branch_format())
716
 
 
717
 
 
718
 
class TestBound(per_branch.TestCaseWithBranch):
719
 
 
720
 
    def test_bind_unbind(self):
721
 
        branch = self.make_branch('1')
722
 
        branch2 = self.make_branch('2')
723
 
        try:
724
 
            branch.bind(branch2)
725
 
        except errors.UpgradeRequired:
726
 
            raise tests.TestNotApplicable('Format does not support binding')
727
 
        self.assertTrue(branch.unbind())
728
 
        self.assertFalse(branch.unbind())
729
 
        self.assertIs(None, branch.get_bound_location())
730
 
 
731
 
    def test_old_bound_location(self):
732
 
        branch = self.make_branch('branch1')
733
 
        try:
734
 
            self.assertIs(None, branch.get_old_bound_location())
735
 
        except errors.UpgradeRequired:
736
 
            raise tests.TestNotApplicable(
737
 
                'Format does not store old bound locations')
738
 
        branch2 = self.make_branch('branch2')
739
 
        branch.bind(branch2)
740
 
        self.assertIs(None, branch.get_old_bound_location())
741
 
        branch.unbind()
742
 
        self.assertContainsRe(
743
 
            branch.get_old_bound_location(), '\\/branch2\\/$')
744
 
 
745
 
    def test_bind_diverged(self):
746
 
        tree_a = self.make_branch_and_tree('tree_a')
747
 
        tree_a.commit('rev1a')
748
 
        tree_b = tree_a.controldir.sprout('tree_b').open_workingtree()
749
 
        tree_a.commit('rev2a')
750
 
        tree_b.commit('rev2b')
751
 
        try:
752
 
            tree_b.branch.bind(tree_a.branch)
753
 
        except errors.UpgradeRequired:
754
 
            raise tests.TestNotApplicable('Format does not support binding')
755
 
 
756
 
    def test_unbind_clears_cached_master_branch(self):
757
 
        """b.unbind clears any cached value of b.get_master_branch."""
758
 
        master = self.make_branch('master')
759
 
        branch = self.make_branch('branch')
760
 
        try:
761
 
            branch.bind(master)
762
 
        except errors.UpgradeRequired:
763
 
            raise tests.TestNotApplicable('Format does not support binding')
764
 
        self.addCleanup(branch.lock_write().unlock)
765
 
        self.assertNotEqual(None, branch.get_master_branch())
766
 
        branch.unbind()
767
 
        self.assertEqual(None, branch.get_master_branch())
768
 
 
769
 
    def test_bind_clears_cached_master_branch(self):
770
 
        """b.bind clears any cached value of b.get_master_branch."""
771
 
        master1 = self.make_branch('master1')
772
 
        master2 = self.make_branch('master2')
773
 
        branch = self.make_branch('branch')
774
 
        try:
775
 
            branch.bind(master1)
776
 
        except errors.UpgradeRequired:
777
 
            raise tests.TestNotApplicable('Format does not support binding')
778
 
        self.addCleanup(branch.lock_write().unlock)
779
 
        self.assertNotEqual(None, branch.get_master_branch())
780
 
        branch.bind(master2)
781
 
        self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
782
 
                                                    branch.get_master_branch().base))
783
 
 
784
 
    def test_set_bound_location_clears_cached_master_branch(self):
785
 
        """b.set_bound_location clears any cached value of b.get_master_branch.
786
 
        """
787
 
        master1 = self.make_branch('master1')
788
 
        master2 = self.make_branch('master2')
789
 
        branch = self.make_branch('branch')
790
 
        try:
791
 
            branch.bind(master1)
792
 
        except errors.UpgradeRequired:
793
 
            raise tests.TestNotApplicable('Format does not support binding')
794
 
        self.addCleanup(branch.lock_write().unlock)
795
 
        self.assertNotEqual(None, branch.get_master_branch())
796
 
        branch.set_bound_location(self.get_url('master2'))
797
 
        self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
798
 
                                                    branch.get_master_branch().base))
799
 
 
800
 
 
801
 
class TestStrict(per_branch.TestCaseWithBranch):
802
 
 
803
 
    def test_strict_history(self):
804
 
        tree1 = self.make_branch_and_tree('tree1')
805
 
        try:
806
 
            tree1.branch.set_append_revisions_only(True)
807
 
        except errors.UpgradeRequired:
808
 
            raise tests.TestSkipped('Format does not support strict history')
809
 
        tree1.commit('empty commit')
810
 
        tree2 = tree1.controldir.sprout('tree2').open_workingtree()
811
 
        tree2.commit('empty commit 2')
812
 
        tree1.pull(tree2.branch)
813
 
        tree1.commit('empty commit 3')
814
 
        tree2.commit('empty commit 4')
815
 
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
816
 
        tree2.merge_from_branch(tree1.branch)
817
 
        tree2.commit('empty commit 5')
818
 
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
819
 
                          tree2.branch)
820
 
        tree3 = tree1.controldir.sprout('tree3').open_workingtree()
821
 
        tree3.merge_from_branch(tree2.branch)
822
 
        tree3.commit('empty commit 6')
823
 
        tree2.pull(tree3.branch)
824
 
 
825
 
 
826
 
class TestIgnoreFallbacksParameter(per_branch.TestCaseWithBranch):
827
 
 
828
 
    def make_branch_with_fallback(self):
829
 
        fallback = self.make_branch('fallback')
830
 
        if not fallback._format.supports_stacking():
831
 
            raise tests.TestNotApplicable("format does not support stacking")
832
 
        stacked = self.make_branch('stacked')
833
 
        stacked.set_stacked_on_url(fallback.base)
834
 
        return stacked
835
 
 
836
 
    def test_fallbacks_not_opened(self):
837
 
        stacked = self.make_branch_with_fallback()
838
 
        self.get_transport('').rename('fallback', 'moved')
839
 
        reopened_dir = controldir.ControlDir.open(stacked.base)
840
 
        reopened = reopened_dir.open_branch(ignore_fallbacks=True)
841
 
        self.assertEqual([], reopened.repository._fallback_repositories)
842
 
 
843
 
    def test_fallbacks_are_opened(self):
844
 
        stacked = self.make_branch_with_fallback()
845
 
        reopened_dir = controldir.ControlDir.open(stacked.base)
846
 
        reopened = reopened_dir.open_branch(ignore_fallbacks=False)
847
 
        self.assertLength(1, reopened.repository._fallback_repositories)
848
 
 
849
 
 
850
 
class TestBranchControlComponent(per_branch.TestCaseWithBranch):
851
 
    """Branch implementations adequately implement ControlComponent."""
852
 
 
853
 
    def test_urls(self):
854
 
        br = self.make_branch('branch')
855
 
        self.assertIsInstance(br.user_url, str)
856
 
        self.assertEqual(br.user_url, br.user_transport.base)
857
 
        self.assertEqual(br.control_url, br.control_transport.base)
858
 
 
859
 
 
860
 
class FakeShelfCreator(object):
861
 
 
862
 
    def __init__(self, branch):
863
 
        self.branch = branch
864
 
 
865
 
    def write_shelf(self, shelf_file, message=None):
866
 
        tree = self.branch.repository.revision_tree(revision.NULL_REVISION)
867
 
        with transform.TransformPreview(tree) as tt:
868
 
            shelf.ShelfCreator._write_shelf(
869
 
                shelf_file, tt, revision.NULL_REVISION)
870
 
 
871
 
 
872
 
@contextlib.contextmanager
873
 
def skip_if_storing_uncommitted_unsupported():
874
 
    try:
875
 
        yield
876
 
    except errors.StoringUncommittedNotSupported:
877
 
        raise tests.TestNotApplicable('Cannot store uncommitted changes.')
878
 
 
879
 
 
880
 
class TestUncommittedChanges(per_branch.TestCaseWithBranch):
881
 
 
882
 
    def setUp(self):
883
 
        super(TestUncommittedChanges, self).setUp()
884
 
        if not self.branch_format.supports_store_uncommitted():
885
 
            raise tests.TestNotApplicable(
886
 
                'Branch format does not support store_uncommitted')
887
 
 
888
 
    def bind(self, branch, master):
889
 
        try:
890
 
            branch.bind(master)
891
 
        except errors.UpgradeRequired:
892
 
            raise tests.TestNotApplicable('Branch cannot be bound.')
893
 
 
894
 
    def test_store_uncommitted(self):
895
 
        tree = self.make_branch_and_tree('b')
896
 
        branch = tree.branch
897
 
        creator = FakeShelfCreator(branch)
898
 
        with skip_if_storing_uncommitted_unsupported():
899
 
            self.assertIs(None, branch.get_unshelver(tree))
900
 
        branch.store_uncommitted(creator)
901
 
        self.assertIsNot(None, branch.get_unshelver(tree))
902
 
 
903
 
    def test_store_uncommitted_bound(self):
904
 
        tree = self.make_branch_and_tree('b')
905
 
        branch = tree.branch
906
 
        master = self.make_branch('master')
907
 
        self.bind(branch, master)
908
 
        creator = FakeShelfCreator(tree.branch)
909
 
        self.assertIs(None, tree.branch.get_unshelver(tree))
910
 
        self.assertIs(None, master.get_unshelver(tree))
911
 
        tree.branch.store_uncommitted(creator)
912
 
        self.assertIsNot(None, master.get_unshelver(tree))
913
 
 
914
 
    def test_store_uncommitted_already_stored(self):
915
 
        branch = self.make_branch('b')
916
 
        with skip_if_storing_uncommitted_unsupported():
917
 
            branch.store_uncommitted(FakeShelfCreator(branch))
918
 
        self.assertRaises(errors.ChangesAlreadyStored,
919
 
                          branch.store_uncommitted, FakeShelfCreator(branch))
920
 
 
921
 
    def test_store_uncommitted_none(self):
922
 
        branch = self.make_branch('b')
923
 
        with skip_if_storing_uncommitted_unsupported():
924
 
            branch.store_uncommitted(FakeShelfCreator(branch))
925
 
        branch.store_uncommitted(None)
926
 
        self.assertIs(None, branch.get_unshelver(None))
927
 
 
928
 
    def test_get_unshelver(self):
929
 
        tree = self.make_branch_and_tree('tree')
930
 
        tree.commit('')
931
 
        self.build_tree_contents([('tree/file', b'contents1')])
932
 
        tree.add('file')
933
 
        with skip_if_storing_uncommitted_unsupported():
934
 
            tree.store_uncommitted()
935
 
        unshelver = tree.branch.get_unshelver(tree)
936
 
        self.assertIsNot(None, unshelver)
937
 
 
938
 
    def test_get_unshelver_bound(self):
939
 
        tree = self.make_branch_and_tree('tree')
940
 
        tree.commit('')
941
 
        self.build_tree_contents([('tree/file', b'contents1')])
942
 
        tree.add('file')
943
 
        with skip_if_storing_uncommitted_unsupported():
944
 
            tree.store_uncommitted()
945
 
        branch = self.make_branch('branch')
946
 
        self.bind(branch, tree.branch)
947
 
        unshelver = branch.get_unshelver(tree)
948
 
        self.assertIsNot(None, unshelver)
949
 
 
950
 
 
951
 
class TestFormatMetadata(per_branch.TestCaseWithBranch):
952
 
 
953
 
    def test_stores_revno(self):
954
 
        self.assertIn(self.branch_format.stores_revno(), (True, False))
 
384
        self.branch.set_push_location('foo')
 
385
        self.assertFileEqual("[%s]\n"
 
386
                             "push_location = foo" % os.getcwdu(),
 
387
                             '.bazaar/branches.conf')
 
388
 
 
389
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
390
    # recursive section - that is, it appends the branch name.