1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
30
# TODO: Make a branch using basis branch, and check that it
31
# doesn't request any files that could have been avoided, by
32
# hooking into the Transport.
34
class TestBranch(TestCaseInTempDir):
36
def test_append_revisions(self):
37
"""Test appending more than one revision"""
38
br = Branch.initialize(".")
39
br.append_revision("rev1")
40
self.assertEquals(br.revision_history(), ["rev1",])
41
br.append_revision("rev2", "rev3")
42
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
44
def test_fetch_revisions(self):
45
"""Test fetch-revision operation."""
46
from bzrlib.fetch import Fetcher
49
b1 = Branch.initialize('b1')
50
b2 = Branch.initialize('b2')
51
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
52
b1.add(['foo'], ['foo-id'])
53
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
56
f = Fetcher(from_branch=b1, to_branch=b2)
57
eq = self.assertEquals
59
eq(f.last_revision, 'revision-1')
61
rev = b2.get_revision('revision-1')
62
tree = b2.revision_tree('revision-1')
63
eq(tree.get_file_text('foo-id'), 'hello')
65
def get_unbalanced_branch_pair(self):
66
"""Return two branches, a and b, with one file in a."""
68
br_a = Branch.initialize("a")
69
file('a/b', 'wb').write('b')
71
commit(br_a, "silly commit", rev_id='A')
73
br_b = Branch.initialize("b")
76
def get_balanced_branch_pair(self):
77
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
78
br_a, br_b = self.get_unbalanced_branch_pair()
79
br_a.push_stores(br_b)
82
def test_push_stores(self):
83
"""Copy the stores from one branch to another"""
84
br_a, br_b = self.get_unbalanced_branch_pair()
85
# ensure the revision is missing.
86
self.assertRaises(NoSuchRevision, br_b.get_revision,
87
br_a.revision_history()[0])
88
br_a.push_stores(br_b)
89
# check that b now has all the data from a's first commit.
90
rev = br_b.get_revision(br_a.revision_history()[0])
91
tree = br_b.revision_tree(br_a.revision_history()[0])
93
if tree.inventory[file_id].kind == "file":
94
tree.get_file(file_id).read()
97
def test_copy_branch(self):
98
"""Copy the stores from one branch to another"""
99
br_a, br_b = self.get_balanced_branch_pair()
100
commit(br_b, "silly commit")
102
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
103
self.assertEqual(br_a.revision_history(), br_c.revision_history())
105
def test_copy_partial(self):
106
"""Copy only part of the history of a branch."""
107
self.build_tree(['a/', 'a/one'])
108
br_a = Branch.initialize('a')
110
br_a.commit('commit one', rev_id='u@d-1')
111
self.build_tree(['a/two'])
113
br_a.commit('commit two', rev_id='u@d-2')
114
br_b = copy_branch(br_a, 'b', revision='u@d-1')
115
self.assertEqual(br_b.last_revision(), 'u@d-1')
116
self.assertTrue(os.path.exists('b/one'))
117
self.assertFalse(os.path.exists('b/two'))
120
def test_record_initial_ghost_merge(self):
121
"""A pending merge with no revision present is still a merge."""
122
branch = Branch.initialize('.')
123
branch.add_pending_merge('non:existent@rev--ision--0--2')
124
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
125
rev = branch.get_revision(branch.last_revision())
126
self.assertEqual(len(rev.parent_ids), 1)
127
# parent_sha1s is not populated now, WTF. rbc 20051003
128
self.assertEqual(len(rev.parent_sha1s), 0)
129
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
132
# compare the gpg-to-sign info for a commit with a ghost and
133
# an identical tree without a ghost
134
# fetch missing should rewrite the TOC of weaves to list newly available parents.
136
def test_pending_merges(self):
137
"""Tracking pending-merged revisions."""
138
b = Branch.initialize('.')
140
self.assertEquals(b.pending_merges(), [])
141
b.add_pending_merge('foo@azkhazan-123123-abcabc')
142
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
143
b.add_pending_merge('foo@azkhazan-123123-abcabc')
144
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
145
b.add_pending_merge('wibble@fofof--20050401--1928390812')
146
self.assertEquals(b.pending_merges(),
147
['foo@azkhazan-123123-abcabc',
148
'wibble@fofof--20050401--1928390812'])
149
b.commit("commit from base with two merges")
150
rev = b.get_revision(b.revision_history()[0])
151
self.assertEquals(len(rev.parent_ids), 2)
152
self.assertEquals(rev.parent_ids[0],
153
'foo@azkhazan-123123-abcabc')
154
self.assertEquals(rev.parent_ids[1],
155
'wibble@fofof--20050401--1928390812')
156
# list should be cleared when we do a commit
157
self.assertEquals(b.pending_merges(), [])
159
def test_sign_existing_revision(self):
160
branch = Branch.initialize('.')
161
branch.commit("base", allow_pointless=True, rev_id='A')
162
from bzrlib.testament import Testament
163
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
164
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
165
branch.revision_store.get('A', 'sig').read())
167
def test_store_signature(self):
168
branch = Branch.initialize('.')
169
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
171
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
174
class TestRemote(TestCaseWithWebserver):
176
def test_open_containing(self):
177
self.assertRaises(NotBranchError, Branch.open_containing,
178
self.get_remote_url(''))
179
self.assertRaises(NotBranchError, Branch.open_containing,
180
self.get_remote_url('g/p/q'))
181
b = Branch.initialize('.')
182
branch, relpath = Branch.open_containing(self.get_remote_url(''))
183
self.assertEqual('', relpath)
184
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
185
self.assertEqual('g/p/q', relpath)
187
# TODO: rewrite this as a regular unittest, without relying on the displayed output
188
# >>> from bzrlib.commit import commit
189
# >>> bzrlib.trace.silent = True
190
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
193
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
194
# >>> br2 = ScratchBranch()
195
# >>> br2.update_revisions(br1)
197
# Added 1 inventories.
199
# >>> br2.revision_history()
201
# >>> br2.update_revisions(br1)
203
# >>> br1.text_store.total_size() == br2.text_store.total_size()
206
class InstrumentedTransaction(object):
209
self.calls.append('finish')
215
class TestDecorator(object):
221
self._calls.append('lr')
223
def lock_write(self):
224
self._calls.append('lw')
227
self._calls.append('ul')
230
def do_with_read(self):
234
def except_with_read(self):
238
def do_with_write(self):
242
def except_with_write(self):
246
class TestDecorators(TestCase):
248
def test_needs_read_lock(self):
249
branch = TestDecorator()
250
self.assertEqual(1, branch.do_with_read())
251
self.assertEqual(['lr', 'ul'], branch._calls)
253
def test_excepts_in_read_lock(self):
254
branch = TestDecorator()
255
self.assertRaises(RuntimeError, branch.except_with_read)
256
self.assertEqual(['lr', 'ul'], branch._calls)
258
def test_needs_write_lock(self):
259
branch = TestDecorator()
260
self.assertEqual(2, branch.do_with_write())
261
self.assertEqual(['lw', 'ul'], branch._calls)
263
def test_excepts_in_write_lock(self):
264
branch = TestDecorator()
265
self.assertRaises(RuntimeError, branch.except_with_write)
266
self.assertEqual(['lw', 'ul'], branch._calls)
269
class TestBranchTransaction(TestCaseInTempDir):
272
super(TestBranchTransaction, self).setUp()
273
self.branch = Branch.initialize('.')
275
def test_default_get_transaction(self):
276
"""branch.get_transaction on a new branch should give a PassThrough."""
277
self.failUnless(isinstance(self.branch.get_transaction(),
278
transactions.PassThroughTransaction))
280
def test__set_new_transaction(self):
281
self.branch._set_transaction(transactions.ReadOnlyTransaction())
283
def test__set_over_existing_transaction_raises(self):
284
self.branch._set_transaction(transactions.ReadOnlyTransaction())
285
self.assertRaises(errors.LockError,
286
self.branch._set_transaction,
287
transactions.ReadOnlyTransaction())
289
def test_finish_no_transaction_raises(self):
290
self.assertRaises(errors.LockError, self.branch._finish_transaction)
292
def test_finish_readonly_transaction_works(self):
293
self.branch._set_transaction(transactions.ReadOnlyTransaction())
294
self.branch._finish_transaction()
295
self.assertEqual(None, self.branch._transaction)
297
def test_unlock_calls_finish(self):
298
self.branch.lock_read()
299
transaction = InstrumentedTransaction()
300
self.branch._transaction = transaction
302
self.assertEqual(['finish'], transaction.calls)
304
def test_lock_read_acquires_ro_transaction(self):
305
self.branch.lock_read()
306
self.failUnless(isinstance(self.branch.get_transaction(),
307
transactions.ReadOnlyTransaction))
310
def test_lock_write_acquires_passthrough_transaction(self):
311
self.branch.lock_write()
312
# cannot use get_transaction as its magic
313
self.failUnless(isinstance(self.branch._transaction,
314
transactions.PassThroughTransaction))