1
# (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
22
import bzrlib.branch as branch
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
from bzrlib.clone import copy_branch
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (NoSuchRevision,
29
UninitializableFormat,
33
from bzrlib.osutils import getcwd
34
from bzrlib.tests import TestCase, TestCaseInTempDir, TestSkipped
35
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
36
from bzrlib.trace import mutter
37
import bzrlib.transactions as transactions
38
from bzrlib.transport import get_transport
39
from bzrlib.revision import NULL_REVISION
41
# TODO: Make a branch using basis branch, and check that it
42
# doesn't request any files that could have been avoided, by
43
# hooking into the Transport.
46
class TestCaseWithBranch(TestCaseInTempDir):
49
super(TestCaseWithBranch, self).setUp()
53
if self.branch is None:
54
self.branch = self.make_branch('.')
57
def make_branch(self, relpath):
59
return self.branch_format.initialize(relpath)
60
except UninitializableFormat:
61
raise TestSkipped("Format %s is not initializable.")
64
class TestBranch(TestCaseWithBranch):
66
def test_append_revisions(self):
67
"""Test appending more than one revision"""
68
br = self.get_branch()
69
br.append_revision("rev1")
70
self.assertEquals(br.revision_history(), ["rev1",])
71
br.append_revision("rev2", "rev3")
72
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
74
def test_fetch_revisions(self):
75
"""Test fetch-revision operation."""
76
from bzrlib.fetch import Fetcher
79
b1 = self.make_branch('b1')
80
b2 = self.make_branch('b2')
81
file('b1/foo', 'w').write('hello')
82
b1.working_tree().add(['foo'], ['foo-id'])
83
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
86
f = Fetcher(from_branch=b1, to_branch=b2)
87
eq = self.assertEquals
89
eq(f.last_revision, 'revision-1')
91
rev = b2.get_revision('revision-1')
92
tree = b2.revision_tree('revision-1')
93
eq(tree.get_file_text('foo-id'), 'hello')
95
def test_revision_tree(self):
96
b1 = self.get_branch()
97
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
98
tree = b1.revision_tree('revision-1')
99
tree = b1.revision_tree(None)
100
self.assertEqual(len(tree.list_files()), 0)
101
tree = b1.revision_tree(NULL_REVISION)
102
self.assertEqual(len(tree.list_files()), 0)
104
def get_unbalanced_branch_pair(self):
105
"""Return two branches, a and b, with one file in a."""
107
br_a = self.make_branch('a')
108
file('a/b', 'wb').write('b')
109
br_a.working_tree().add('b')
110
commit(br_a, "silly commit", rev_id='A')
112
br_b = self.make_branch('b')
115
def get_balanced_branch_pair(self):
116
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
117
br_a, br_b = self.get_unbalanced_branch_pair()
118
br_a.push_stores(br_b)
121
def test_push_stores(self):
122
"""Copy the stores from one branch to another"""
123
br_a, br_b = self.get_unbalanced_branch_pair()
124
# ensure the revision is missing.
125
self.assertRaises(NoSuchRevision, br_b.get_revision,
126
br_a.revision_history()[0])
127
br_a.push_stores(br_b)
128
# check that b now has all the data from a's first commit.
129
rev = br_b.get_revision(br_a.revision_history()[0])
130
tree = br_b.revision_tree(br_a.revision_history()[0])
132
if tree.inventory[file_id].kind == "file":
133
tree.get_file(file_id).read()
136
def test_copy_branch(self):
137
"""Copy the stores from one branch to another"""
138
br_a, br_b = self.get_balanced_branch_pair()
139
commit(br_b, "silly commit")
141
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
142
self.assertEqual(br_a.revision_history(), br_c.revision_history())
144
def test_copy_partial(self):
145
"""Copy only part of the history of a branch."""
146
self.build_tree(['a/', 'a/one'])
147
br_a = self.make_branch('a')
148
br_a.working_tree().add(['one'])
149
br_a.working_tree().commit('commit one', rev_id='u@d-1')
150
self.build_tree(['a/two'])
151
br_a.working_tree().add(['two'])
152
br_a.working_tree().commit('commit two', rev_id='u@d-2')
153
br_b = copy_branch(br_a, 'b', revision='u@d-1')
154
self.assertEqual(br_b.last_revision(), 'u@d-1')
155
self.assertTrue(os.path.exists('b/one'))
156
self.assertFalse(os.path.exists('b/two'))
158
def test_record_initial_ghost_merge(self):
159
"""A pending merge with no revision present is still a merge."""
160
branch = self.get_branch()
161
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
162
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
163
rev = branch.get_revision(branch.last_revision())
164
self.assertEqual(len(rev.parent_ids), 1)
165
# parent_sha1s is not populated now, WTF. rbc 20051003
166
self.assertEqual(len(rev.parent_sha1s), 0)
167
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
169
def test_bad_revision(self):
170
self.assertRaises(errors.InvalidRevisionId, self.get_branch().get_revision, None)
173
# compare the gpg-to-sign info for a commit with a ghost and
174
# an identical tree without a ghost
175
# fetch missing should rewrite the TOC of weaves to list newly available parents.
177
def test_pending_merges(self):
178
"""Tracking pending-merged revisions."""
179
b = self.get_branch()
180
wt = b.working_tree()
181
self.assertEquals(wt.pending_merges(), [])
182
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
183
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
184
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
185
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
186
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
187
self.assertEquals(wt.pending_merges(),
188
['foo@azkhazan-123123-abcabc',
189
'wibble@fofof--20050401--1928390812'])
190
b.working_tree().commit("commit from base with two merges")
191
rev = b.get_revision(b.revision_history()[0])
192
self.assertEquals(len(rev.parent_ids), 2)
193
self.assertEquals(rev.parent_ids[0],
194
'foo@azkhazan-123123-abcabc')
195
self.assertEquals(rev.parent_ids[1],
196
'wibble@fofof--20050401--1928390812')
197
# list should be cleared when we do a commit
198
self.assertEquals(wt.pending_merges(), [])
200
def test_sign_existing_revision(self):
201
branch = self.get_branch()
202
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
203
from bzrlib.testament import Testament
204
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
205
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
206
branch.revision_store.get('A', 'sig').read())
208
def test_store_signature(self):
209
branch = self.get_branch()
210
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
212
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
214
def test__relcontrolfilename(self):
215
self.assertEqual('.bzr/%25', self.get_branch()._rel_controlfilename('%'))
217
def test__relcontrolfilename_empty(self):
218
self.assertEqual('.bzr', self.get_branch()._rel_controlfilename(''))
220
def test_nicks(self):
221
"""Branch nicknames"""
223
branch = self.make_branch('bzr.dev')
224
self.assertEqual(branch.nick, 'bzr.dev')
225
os.rename('bzr.dev', 'bzr.ab')
226
branch = Branch.open('bzr.ab')
227
self.assertEqual(branch.nick, 'bzr.ab')
228
branch.nick = "Aaron's branch"
229
branch.nick = "Aaron's branch"
230
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
231
self.assertEqual(branch.nick, "Aaron's branch")
232
os.rename('bzr.ab', 'integration')
233
branch = Branch.open('integration')
234
self.assertEqual(branch.nick, "Aaron's branch")
235
branch.nick = u"\u1234"
236
self.assertEqual(branch.nick, u"\u1234")
238
def test_commit_nicks(self):
239
"""Nicknames are committed to the revision"""
241
branch = self.get_branch()
242
branch.nick = "My happy branch"
243
branch.working_tree().commit('My commit respect da nick.')
244
committed = branch.get_revision(branch.last_revision())
245
self.assertEqual(committed.properties["branch-nick"],
249
class TestRemote(TestCaseWithWebserver):
251
def test_open_containing(self):
252
self.assertRaises(NotBranchError, Branch.open_containing,
253
self.get_remote_url(''))
254
self.assertRaises(NotBranchError, Branch.open_containing,
255
self.get_remote_url('g/p/q'))
257
branch = self.branch_format.initialize('.')
258
except UninitializableFormat:
259
raise TestSkipped("Format %s is not initializable.")
260
branch, relpath = Branch.open_containing(self.get_remote_url(''))
261
self.assertEqual('', relpath)
262
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
263
self.assertEqual('g/p/q', relpath)
265
# TODO: rewrite this as a regular unittest, without relying on the displayed output
266
# >>> from bzrlib.commit import commit
267
# >>> bzrlib.trace.silent = True
268
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
269
# >>> br1.working_tree().add('foo')
270
# >>> br1.working_tree().add('bar')
271
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
272
# >>> br2 = ScratchBranch()
273
# >>> br2.update_revisions(br1)
275
# Added 1 inventories.
277
# >>> br2.revision_history()
279
# >>> br2.update_revisions(br1)
281
# >>> br1.text_store.total_size() == br2.text_store.total_size()
284
class InstrumentedTransaction(object):
287
self.calls.append('finish')
293
class TestDecorator(object):
299
self._calls.append('lr')
301
def lock_write(self):
302
self._calls.append('lw')
305
self._calls.append('ul')
308
def do_with_read(self):
312
def except_with_read(self):
316
def do_with_write(self):
320
def except_with_write(self):
324
class TestDecorators(TestCase):
326
def test_needs_read_lock(self):
327
branch = TestDecorator()
328
self.assertEqual(1, branch.do_with_read())
329
self.assertEqual(['lr', 'ul'], branch._calls)
331
def test_excepts_in_read_lock(self):
332
branch = TestDecorator()
333
self.assertRaises(RuntimeError, branch.except_with_read)
334
self.assertEqual(['lr', 'ul'], branch._calls)
336
def test_needs_write_lock(self):
337
branch = TestDecorator()
338
self.assertEqual(2, branch.do_with_write())
339
self.assertEqual(['lw', 'ul'], branch._calls)
341
def test_excepts_in_write_lock(self):
342
branch = TestDecorator()
343
self.assertRaises(RuntimeError, branch.except_with_write)
344
self.assertEqual(['lw', 'ul'], branch._calls)
347
class TestBranchTransaction(TestCaseWithBranch):
350
super(TestBranchTransaction, self).setUp()
353
def test_default_get_transaction(self):
354
"""branch.get_transaction on a new branch should give a PassThrough."""
355
self.failUnless(isinstance(self.get_branch().get_transaction(),
356
transactions.PassThroughTransaction))
358
def test__set_new_transaction(self):
359
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
361
def test__set_over_existing_transaction_raises(self):
362
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
363
self.assertRaises(errors.LockError,
364
self.get_branch()._set_transaction,
365
transactions.ReadOnlyTransaction())
367
def test_finish_no_transaction_raises(self):
368
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
370
def test_finish_readonly_transaction_works(self):
371
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
372
self.get_branch()._finish_transaction()
373
self.assertEqual(None, self.get_branch()._transaction)
375
def test_unlock_calls_finish(self):
376
self.get_branch().lock_read()
377
transaction = InstrumentedTransaction()
378
self.get_branch()._transaction = transaction
379
self.get_branch().unlock()
380
self.assertEqual(['finish'], transaction.calls)
382
def test_lock_read_acquires_ro_transaction(self):
383
self.get_branch().lock_read()
384
self.failUnless(isinstance(self.get_branch().get_transaction(),
385
transactions.ReadOnlyTransaction))
386
self.get_branch().unlock()
388
def test_lock_write_acquires_passthrough_transaction(self):
389
self.get_branch().lock_write()
390
# cannot use get_transaction as its magic
391
self.failUnless(isinstance(self.get_branch()._transaction,
392
transactions.PassThroughTransaction))
393
self.get_branch().unlock()
396
class TestBranchPushLocations(TestCaseWithBranch):
398
def test_get_push_location_unset(self):
399
self.assertEqual(None, self.get_branch().get_push_location())
401
def test_get_push_location_exact(self):
402
from bzrlib.config import (branches_config_filename,
403
ensure_config_dir_exists)
404
ensure_config_dir_exists()
405
fn = branches_config_filename()
406
print >> open(fn, 'wt'), ("[%s]\n"
407
"push_location=foo" %
409
self.assertEqual("foo", self.get_branch().get_push_location())
411
def test_set_push_location(self):
412
from bzrlib.config import (branches_config_filename,
413
ensure_config_dir_exists)
414
ensure_config_dir_exists()
415
fn = branches_config_filename()
416
self.get_branch().set_push_location('foo')
417
self.assertFileEqual("[%s]\n"
418
"push_location = foo" % getcwd(),
421
# TODO RBC 20051029 test getting a push location from a branch in a
422
# recursive section - that is, it appends the branch name.
425
class TestFormat(TestCaseWithBranch):
426
"""Tests for the format itself."""
428
def test_format_initialize_find_open(self):
429
# loopback test to check the current format initializes to itself.
430
if not self.branch_format.is_supported():
431
# unsupported formats are not loopback testable
432
# because the default open will not open them and
433
# they may not be initializable.
435
# supported formats must be able to init and open
436
t = get_transport('.')
437
made_branch = self.branch_format.initialize(t.base)
438
self.failUnless(isinstance(made_branch, branch.Branch))
439
self.assertEqual(self.branch_format,
440
branch.BzrBranchFormat.find_format(t))
441
direct_opened_branch = self.branch_format.open(t)
442
opened_branch = branch.Branch.open(t.base)
443
self.assertEqual(made_branch._branch_format,
444
opened_branch._branch_format)
445
self.assertEqual(direct_opened_branch._branch_format,
446
opened_branch._branch_format)
447
self.failUnless(isinstance(opened_branch, branch.Branch))
449
def test_open_not_branch(self):
450
self.assertRaises(NoSuchFile,
451
self.branch_format.open,