1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
29
from bzrlib.revision import NULL_REVISION
31
# TODO: Make a branch using basis branch, and check that it
32
# doesn't request any files that could have been avoided, by
33
# hooking into the Transport.
35
class TestBranch(TestCaseInTempDir):
37
def test_append_revisions(self):
38
"""Test appending more than one revision"""
39
br = Branch.initialize(".")
40
br.append_revision("rev1")
41
self.assertEquals(br.revision_history(), ["rev1",])
42
br.append_revision("rev2", "rev3")
43
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
45
def test_fetch_revisions(self):
46
"""Test fetch-revision operation."""
47
from bzrlib.fetch import Fetcher
50
b1 = Branch.initialize('b1')
51
b2 = Branch.initialize('b2')
52
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
b1.add(['foo'], ['foo-id'])
54
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
57
f = Fetcher(from_branch=b1, to_branch=b2)
58
eq = self.assertEquals
60
eq(f.last_revision, 'revision-1')
62
rev = b2.get_revision('revision-1')
63
tree = b2.revision_tree('revision-1')
64
eq(tree.get_file_text('foo-id'), 'hello')
66
def test_revision_tree(self):
67
b1 = Branch.initialize('.')
68
b1.commit('lala!', rev_id='revision-1', allow_pointless=True)
69
tree = b1.revision_tree('revision-1')
70
tree = b1.revision_tree(None)
71
self.assertEqual(len(tree.list_files()), 0)
72
tree = b1.revision_tree(NULL_REVISION)
73
self.assertEqual(len(tree.list_files()), 0)
75
def get_unbalanced_branch_pair(self):
76
"""Return two branches, a and b, with one file in a."""
78
br_a = Branch.initialize("a")
79
file('a/b', 'wb').write('b')
81
commit(br_a, "silly commit", rev_id='A')
83
br_b = Branch.initialize("b")
86
def get_balanced_branch_pair(self):
87
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
br_a, br_b = self.get_unbalanced_branch_pair()
89
br_a.push_stores(br_b)
92
def test_push_stores(self):
93
"""Copy the stores from one branch to another"""
94
br_a, br_b = self.get_unbalanced_branch_pair()
95
# ensure the revision is missing.
96
self.assertRaises(NoSuchRevision, br_b.get_revision,
97
br_a.revision_history()[0])
98
br_a.push_stores(br_b)
99
# check that b now has all the data from a's first commit.
100
rev = br_b.get_revision(br_a.revision_history()[0])
101
tree = br_b.revision_tree(br_a.revision_history()[0])
103
if tree.inventory[file_id].kind == "file":
104
tree.get_file(file_id).read()
107
def test_copy_branch(self):
108
"""Copy the stores from one branch to another"""
109
br_a, br_b = self.get_balanced_branch_pair()
110
commit(br_b, "silly commit")
112
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
self.assertEqual(br_a.revision_history(), br_c.revision_history())
115
def test_copy_partial(self):
116
"""Copy only part of the history of a branch."""
117
self.build_tree(['a/', 'a/one'])
118
br_a = Branch.initialize('a')
120
br_a.commit('commit one', rev_id='u@d-1')
121
self.build_tree(['a/two'])
123
br_a.commit('commit two', rev_id='u@d-2')
124
br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
self.assertEqual(br_b.last_revision(), 'u@d-1')
126
self.assertTrue(os.path.exists('b/one'))
127
self.assertFalse(os.path.exists('b/two'))
129
def test_record_initial_ghost_merge(self):
130
"""A pending merge with no revision present is still a merge."""
131
branch = Branch.initialize('.')
132
branch.add_pending_merge('non:existent@rev--ision--0--2')
133
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
134
rev = branch.get_revision(branch.last_revision())
135
self.assertEqual(len(rev.parent_ids), 1)
136
# parent_sha1s is not populated now, WTF. rbc 20051003
137
self.assertEqual(len(rev.parent_sha1s), 0)
138
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
140
def test_bad_revision(self):
141
branch = Branch.initialize('.')
142
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
145
# compare the gpg-to-sign info for a commit with a ghost and
146
# an identical tree without a ghost
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
149
def test_pending_merges(self):
150
"""Tracking pending-merged revisions."""
151
b = Branch.initialize('.')
153
self.assertEquals(b.pending_merges(), [])
154
b.add_pending_merge('foo@azkhazan-123123-abcabc')
155
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
156
b.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
b.add_pending_merge('wibble@fofof--20050401--1928390812')
159
self.assertEquals(b.pending_merges(),
160
['foo@azkhazan-123123-abcabc',
161
'wibble@fofof--20050401--1928390812'])
162
b.commit("commit from base with two merges")
163
rev = b.get_revision(b.revision_history()[0])
164
self.assertEquals(len(rev.parent_ids), 2)
165
self.assertEquals(rev.parent_ids[0],
166
'foo@azkhazan-123123-abcabc')
167
self.assertEquals(rev.parent_ids[1],
168
'wibble@fofof--20050401--1928390812')
169
# list should be cleared when we do a commit
170
self.assertEquals(b.pending_merges(), [])
172
def test_sign_existing_revision(self):
173
branch = Branch.initialize('.')
174
branch.commit("base", allow_pointless=True, rev_id='A')
175
from bzrlib.testament import Testament
176
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
branch.revision_store.get('A', 'sig').read())
180
def test_store_signature(self):
181
branch = Branch.initialize('.')
182
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
184
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
186
def test__relcontrolfilename(self):
187
branch = Branch.initialize('.')
188
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
190
def test__relcontrolfilename_empty(self):
191
branch = Branch.initialize('.')
192
self.assertEqual('.bzr', branch._rel_controlfilename(''))
194
def test_nicks(self):
195
"""Branch nicknames"""
197
branch = Branch.initialize('bzr.dev')
198
self.assertEqual(branch.nick, 'bzr.dev')
199
os.rename('bzr.dev', 'bzr.ab')
200
branch = Branch.open('bzr.ab')
201
self.assertEqual(branch.nick, 'bzr.ab')
202
branch.nick = "Aaron's branch"
203
branch.nick = "Aaron's branch"
204
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
205
self.assertEqual(branch.nick, "Aaron's branch")
206
os.rename('bzr.ab', 'integration')
207
branch = Branch.open('integration')
208
self.assertEqual(branch.nick, "Aaron's branch")
209
branch.nick = u"\u1234"
210
self.assertEqual(branch.nick, u"\u1234")
213
class TestRemote(TestCaseWithWebserver):
215
def test_open_containing(self):
216
self.assertRaises(NotBranchError, Branch.open_containing,
217
self.get_remote_url(''))
218
self.assertRaises(NotBranchError, Branch.open_containing,
219
self.get_remote_url('g/p/q'))
220
b = Branch.initialize('.')
221
branch, relpath = Branch.open_containing(self.get_remote_url(''))
222
self.assertEqual('', relpath)
223
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
224
self.assertEqual('g/p/q', relpath)
226
# TODO: rewrite this as a regular unittest, without relying on the displayed output
227
# >>> from bzrlib.commit import commit
228
# >>> bzrlib.trace.silent = True
229
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
232
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
233
# >>> br2 = ScratchBranch()
234
# >>> br2.update_revisions(br1)
236
# Added 1 inventories.
238
# >>> br2.revision_history()
240
# >>> br2.update_revisions(br1)
242
# >>> br1.text_store.total_size() == br2.text_store.total_size()
245
class InstrumentedTransaction(object):
248
self.calls.append('finish')
254
class TestDecorator(object):
260
self._calls.append('lr')
262
def lock_write(self):
263
self._calls.append('lw')
266
self._calls.append('ul')
269
def do_with_read(self):
273
def except_with_read(self):
277
def do_with_write(self):
281
def except_with_write(self):
285
class TestDecorators(TestCase):
287
def test_needs_read_lock(self):
288
branch = TestDecorator()
289
self.assertEqual(1, branch.do_with_read())
290
self.assertEqual(['lr', 'ul'], branch._calls)
292
def test_excepts_in_read_lock(self):
293
branch = TestDecorator()
294
self.assertRaises(RuntimeError, branch.except_with_read)
295
self.assertEqual(['lr', 'ul'], branch._calls)
297
def test_needs_write_lock(self):
298
branch = TestDecorator()
299
self.assertEqual(2, branch.do_with_write())
300
self.assertEqual(['lw', 'ul'], branch._calls)
302
def test_excepts_in_write_lock(self):
303
branch = TestDecorator()
304
self.assertRaises(RuntimeError, branch.except_with_write)
305
self.assertEqual(['lw', 'ul'], branch._calls)
308
class TestBranchTransaction(TestCaseInTempDir):
311
super(TestBranchTransaction, self).setUp()
312
self.branch = Branch.initialize('.')
314
def test_default_get_transaction(self):
315
"""branch.get_transaction on a new branch should give a PassThrough."""
316
self.failUnless(isinstance(self.branch.get_transaction(),
317
transactions.PassThroughTransaction))
319
def test__set_new_transaction(self):
320
self.branch._set_transaction(transactions.ReadOnlyTransaction())
322
def test__set_over_existing_transaction_raises(self):
323
self.branch._set_transaction(transactions.ReadOnlyTransaction())
324
self.assertRaises(errors.LockError,
325
self.branch._set_transaction,
326
transactions.ReadOnlyTransaction())
328
def test_finish_no_transaction_raises(self):
329
self.assertRaises(errors.LockError, self.branch._finish_transaction)
331
def test_finish_readonly_transaction_works(self):
332
self.branch._set_transaction(transactions.ReadOnlyTransaction())
333
self.branch._finish_transaction()
334
self.assertEqual(None, self.branch._transaction)
336
def test_unlock_calls_finish(self):
337
self.branch.lock_read()
338
transaction = InstrumentedTransaction()
339
self.branch._transaction = transaction
341
self.assertEqual(['finish'], transaction.calls)
343
def test_lock_read_acquires_ro_transaction(self):
344
self.branch.lock_read()
345
self.failUnless(isinstance(self.branch.get_transaction(),
346
transactions.ReadOnlyTransaction))
349
def test_lock_write_acquires_passthrough_transaction(self):
350
self.branch.lock_write()
351
# cannot use get_transaction as its magic
352
self.failUnless(isinstance(self.branch._transaction,
353
transactions.PassThroughTransaction))
357
class TestBranchPushLocations(TestCaseInTempDir):
360
super(TestBranchPushLocations, self).setUp()
361
self.branch = Branch.initialize('.')
363
def test_get_push_location_unset(self):
364
self.assertEqual(None, self.branch.get_push_location())
366
def test_get_push_location_exact(self):
367
self.build_tree(['.bazaar/'])
368
print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
369
"push_location=foo" %
371
self.assertEqual("foo", self.branch.get_push_location())
373
def test_set_push_location(self):
374
self.branch.set_push_location('foo')
375
self.assertFileEqual("[%s]\n"
376
"push_location = foo" % os.getcwdu(),
377
'.bazaar/branches.conf')
379
# TODO RBC 20051029 test getting a push location from a branch in a
380
# recursive section - that is, it appends the branch name.