1
# (C) 2005 Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
 
20
from bzrlib.clone import copy_branch
 
 
21
from bzrlib.commit import commit
 
 
22
import bzrlib.errors as errors
 
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
 
27
from bzrlib.trace import mutter
 
 
28
import bzrlib.transactions as transactions
 
 
29
from bzrlib.revision import NULL_REVISION
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
 
32
# doesn't request any files that could have been avoided, by 
 
 
33
# hooking into the Transport.
 
 
35
class TestBranch(TestCaseInTempDir):
 
 
37
    def test_append_revisions(self):
 
 
38
        """Test appending more than one revision"""
 
 
39
        br = Branch.initialize(".")
 
 
40
        br.append_revision("rev1")
 
 
41
        self.assertEquals(br.revision_history(), ["rev1",])
 
 
42
        br.append_revision("rev2", "rev3")
 
 
43
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
 
45
    def test_fetch_revisions(self):
 
 
46
        """Test fetch-revision operation."""
 
 
47
        from bzrlib.fetch import Fetcher
 
 
50
        b1 = Branch.initialize('b1')
 
 
51
        b2 = Branch.initialize('b2')
 
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
 
53
        b1.add(['foo'], ['foo-id'])
 
 
54
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
 
58
        eq = self.assertEquals
 
 
60
        eq(f.last_revision, 'revision-1')
 
 
62
        rev = b2.get_revision('revision-1')
 
 
63
        tree = b2.revision_tree('revision-1')
 
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
 
66
    def test_revision_tree(self):
 
 
67
        b1 = Branch.initialize('.')
 
 
68
        b1.commit('lala!', rev_id='revision-1', allow_pointless=True)
 
 
69
        tree = b1.revision_tree('revision-1')
 
 
70
        tree = b1.revision_tree(None)
 
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
 
75
    def get_unbalanced_branch_pair(self):
 
 
76
        """Return two branches, a and b, with one file in a."""
 
 
78
        br_a = Branch.initialize("a")
 
 
79
        file('a/b', 'wb').write('b')
 
 
81
        commit(br_a, "silly commit", rev_id='A')
 
 
83
        br_b = Branch.initialize("b")
 
 
86
    def get_balanced_branch_pair(self):
 
 
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
 
89
        br_a.push_stores(br_b)
 
 
92
    def test_push_stores(self):
 
 
93
        """Copy the stores from one branch to another"""
 
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
 
95
        # ensure the revision is missing.
 
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
 
97
                          br_a.revision_history()[0])
 
 
98
        br_a.push_stores(br_b)
 
 
99
        # check that b now has all the data from a's first commit.
 
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
 
103
            if tree.inventory[file_id].kind == "file":
 
 
104
                tree.get_file(file_id).read()
 
 
107
    def test_copy_branch(self):
 
 
108
        """Copy the stores from one branch to another"""
 
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
 
110
        commit(br_b, "silly commit")
 
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
 
115
    def test_copy_partial(self):
 
 
116
        """Copy only part of the history of a branch."""
 
 
117
        self.build_tree(['a/', 'a/one'])
 
 
118
        br_a = Branch.initialize('a')
 
 
120
        br_a.commit('commit one', rev_id='u@d-1')
 
 
121
        self.build_tree(['a/two'])
 
 
123
        br_a.commit('commit two', rev_id='u@d-2')
 
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
 
126
        self.assertTrue(os.path.exists('b/one'))
 
 
127
        self.assertFalse(os.path.exists('b/two'))
 
 
129
    def test_record_initial_ghost_merge(self):
 
 
130
        """A pending merge with no revision present is still a merge."""
 
 
131
        branch = Branch.initialize('.')
 
 
132
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
 
133
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
 
134
        rev = branch.get_revision(branch.last_revision())
 
 
135
        self.assertEqual(len(rev.parent_ids), 1)
 
 
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
 
137
        self.assertEqual(len(rev.parent_sha1s), 0)
 
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
 
140
    def test_bad_revision(self):
 
 
141
        branch = Branch.initialize('.')
 
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
 
145
# compare the gpg-to-sign info for a commit with a ghost and 
 
 
146
#     an identical tree without a ghost
 
 
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
 
149
    def test_pending_merges(self):
 
 
150
        """Tracking pending-merged revisions."""
 
 
151
        b = Branch.initialize('.')
 
 
153
        self.assertEquals(b.pending_merges(), [])
 
 
154
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
 
155
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
 
156
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
 
157
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
 
158
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
 
159
        self.assertEquals(b.pending_merges(),
 
 
160
                          ['foo@azkhazan-123123-abcabc',
 
 
161
                           'wibble@fofof--20050401--1928390812'])
 
 
162
        b.commit("commit from base with two merges")
 
 
163
        rev = b.get_revision(b.revision_history()[0])
 
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
 
165
        self.assertEquals(rev.parent_ids[0],
 
 
166
                          'foo@azkhazan-123123-abcabc')
 
 
167
        self.assertEquals(rev.parent_ids[1],
 
 
168
                           'wibble@fofof--20050401--1928390812')
 
 
169
        # list should be cleared when we do a commit
 
 
170
        self.assertEquals(b.pending_merges(), [])
 
 
172
    def test_sign_existing_revision(self):
 
 
173
        branch = Branch.initialize('.')
 
 
174
        branch.commit("base", allow_pointless=True, rev_id='A')
 
 
175
        from bzrlib.testament import Testament
 
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
 
178
                         branch.revision_store.get('A', 'sig').read())
 
 
180
    def test_store_signature(self):
 
 
181
        branch = Branch.initialize('.')
 
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
 
186
    def test__relcontrolfilename(self):
 
 
187
        branch = Branch.initialize('.')
 
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
 
190
    def test__relcontrolfilename_empty(self):
 
 
191
        branch = Branch.initialize('.')
 
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
 
195
class TestRemote(TestCaseWithWebserver):
 
 
197
    def test_open_containing(self):
 
 
198
        self.assertRaises(NotBranchError, Branch.open_containing,
 
 
199
                          self.get_remote_url(''))
 
 
200
        self.assertRaises(NotBranchError, Branch.open_containing,
 
 
201
                          self.get_remote_url('g/p/q'))
 
 
202
        b = Branch.initialize('.')
 
 
203
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
 
204
        self.assertEqual('', relpath)
 
 
205
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
 
206
        self.assertEqual('g/p/q', relpath)
 
 
208
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
 
209
#         >>> from bzrlib.commit import commit
 
 
210
#         >>> bzrlib.trace.silent = True
 
 
211
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
 
214
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
 
215
#         >>> br2 = ScratchBranch()
 
 
216
#         >>> br2.update_revisions(br1)
 
 
218
#         Added 1 inventories.
 
 
220
#         >>> br2.revision_history()
 
 
222
#         >>> br2.update_revisions(br1)
 
 
224
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
 
227
class InstrumentedTransaction(object):
 
 
230
        self.calls.append('finish')
 
 
236
class TestDecorator(object):
 
 
242
        self._calls.append('lr')
 
 
244
    def lock_write(self):
 
 
245
        self._calls.append('lw')
 
 
248
        self._calls.append('ul')
 
 
251
    def do_with_read(self):
 
 
255
    def except_with_read(self):
 
 
259
    def do_with_write(self):
 
 
263
    def except_with_write(self):
 
 
267
class TestDecorators(TestCase):
 
 
269
    def test_needs_read_lock(self):
 
 
270
        branch = TestDecorator()
 
 
271
        self.assertEqual(1, branch.do_with_read())
 
 
272
        self.assertEqual(['lr', 'ul'], branch._calls)
 
 
274
    def test_excepts_in_read_lock(self):
 
 
275
        branch = TestDecorator()
 
 
276
        self.assertRaises(RuntimeError, branch.except_with_read)
 
 
277
        self.assertEqual(['lr', 'ul'], branch._calls)
 
 
279
    def test_needs_write_lock(self):
 
 
280
        branch = TestDecorator()
 
 
281
        self.assertEqual(2, branch.do_with_write())
 
 
282
        self.assertEqual(['lw', 'ul'], branch._calls)
 
 
284
    def test_excepts_in_write_lock(self):
 
 
285
        branch = TestDecorator()
 
 
286
        self.assertRaises(RuntimeError, branch.except_with_write)
 
 
287
        self.assertEqual(['lw', 'ul'], branch._calls)
 
 
290
class TestBranchTransaction(TestCaseInTempDir):
 
 
293
        super(TestBranchTransaction, self).setUp()
 
 
294
        self.branch = Branch.initialize('.')
 
 
296
    def test_default_get_transaction(self):
 
 
297
        """branch.get_transaction on a new branch should give a PassThrough."""
 
 
298
        self.failUnless(isinstance(self.branch.get_transaction(),
 
 
299
                                   transactions.PassThroughTransaction))
 
 
301
    def test__set_new_transaction(self):
 
 
302
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
 
304
    def test__set_over_existing_transaction_raises(self):
 
 
305
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
 
306
        self.assertRaises(errors.LockError,
 
 
307
                          self.branch._set_transaction,
 
 
308
                          transactions.ReadOnlyTransaction())
 
 
310
    def test_finish_no_transaction_raises(self):
 
 
311
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
 
313
    def test_finish_readonly_transaction_works(self):
 
 
314
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
 
315
        self.branch._finish_transaction()
 
 
316
        self.assertEqual(None, self.branch._transaction)
 
 
318
    def test_unlock_calls_finish(self):
 
 
319
        self.branch.lock_read()
 
 
320
        transaction = InstrumentedTransaction()
 
 
321
        self.branch._transaction = transaction
 
 
323
        self.assertEqual(['finish'], transaction.calls)
 
 
325
    def test_lock_read_acquires_ro_transaction(self):
 
 
326
        self.branch.lock_read()
 
 
327
        self.failUnless(isinstance(self.branch.get_transaction(),
 
 
328
                                   transactions.ReadOnlyTransaction))
 
 
331
    def test_lock_write_acquires_passthrough_transaction(self):
 
 
332
        self.branch.lock_write()
 
 
333
        # cannot use get_transaction as its magic
 
 
334
        self.failUnless(isinstance(self.branch._transaction,
 
 
335
                                   transactions.PassThroughTransaction))
 
 
339
class TestBranchPushLocations(TestCaseInTempDir):
 
 
342
        super(TestBranchPushLocations, self).setUp()
 
 
343
        self.branch = Branch.initialize('.')
 
 
345
    def test_get_push_location_unset(self):
 
 
346
        self.assertEqual(None, self.branch.get_push_location())
 
 
348
    def test_get_push_location_exact(self):
 
 
349
        self.build_tree(['.bazaar/'])
 
 
350
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
 
351
                                                       "push_location=foo" %
 
 
353
        self.assertEqual("foo", self.branch.get_push_location())
 
 
355
    def test_set_push_location(self):
 
 
356
        self.branch.set_push_location('foo')
 
 
357
        self.assertFileEqual("[%s]\n"
 
 
358
                             "push_location = foo" % os.getcwdu(),
 
 
359
                             '.bazaar/branches.conf')
 
 
361
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
 
362
    # recursive section - that is, it appends the branch name.