23
import bzrlib.bzrdir as bzrdir
24
33
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
34
from bzrlib.delta import TreeDelta
27
35
from bzrlib.errors import (FileExists,
30
38
UninitializableFormat,
34
41
from bzrlib.osutils import getcwd
42
import bzrlib.revision
43
from bzrlib.symbol_versioning import deprecated_in
35
44
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
45
from bzrlib.tests.branch_implementations import TestCaseWithBranch
46
from bzrlib.tests.http_server import HttpServer
37
47
from bzrlib.trace import mutter
38
import bzrlib.transactions as transactions
39
48
from bzrlib.transport import get_transport
40
from bzrlib.transport.http import HttpServer
41
49
from bzrlib.transport.memory import MemoryServer
42
50
from bzrlib.upgrade import upgrade
43
51
from bzrlib.workingtree import WorkingTree
46
# TODO: Make a branch using basis branch, and check that it
47
# doesn't request any files that could have been avoided, by
48
# hooking into the Transport.
51
class TestCaseWithBranch(TestCaseWithBzrDir):
54
super(TestCaseWithBranch, self).setUp()
58
if self.branch is None:
59
self.branch = self.make_branch('')
62
def make_branch(self, relpath, format=None):
63
repo = self.make_repository(relpath, format=format)
64
# fixme RBC 20060210 this isnt necessarily a fixable thing,
65
# Skipped is the wrong exception to raise.
67
return self.branch_format.initialize(repo.bzrdir)
68
except errors.UninitializableFormat:
69
raise TestSkipped('Uninitializable branch format')
71
def make_repository(self, relpath, shared=False, format=None):
72
made_control = self.make_bzrdir(relpath, format=format)
73
return made_control.create_repository(shared=shared)
76
54
class TestBranch(TestCaseWithBranch):
78
def test_append_revisions(self):
79
"""Test appending more than one revision"""
56
def test_create_tree_with_merge(self):
57
tree = self.create_tree_with_merge()
59
self.addCleanup(tree.unlock)
60
graph = tree.branch.repository.get_graph()
61
ancestry_graph = graph.get_parent_map(
62
tree.branch.repository.all_revision_ids())
63
self.assertEqual({'rev-1':('null:',),
65
'rev-1.1.1':('rev-1', ),
66
'rev-3':('rev-2', 'rev-1.1.1', ),
69
def test_revision_ids_are_utf8(self):
70
wt = self.make_branch_and_tree('tree')
71
wt.commit('f', rev_id='rev1')
72
wt.commit('f', rev_id='rev2')
73
wt.commit('f', rev_id='rev3')
80
75
br = self.get_branch()
81
br.append_revision("rev1")
82
self.assertEquals(br.revision_history(), ["rev1",])
83
br.append_revision("rev2", "rev3")
84
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
77
br.set_revision_history(['rev1', 'rev2', 'rev3'])
78
rh = br.revision_history()
79
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
80
for revision_id in rh:
81
self.assertIsInstance(revision_id, str)
82
last = br.last_revision()
83
self.assertEqual('rev3', last)
84
self.assertIsInstance(last, str)
85
revno, last = br.last_revision_info()
86
self.assertEqual(3, revno)
87
self.assertEqual('rev3', last)
88
self.assertIsInstance(last, str)
86
90
def test_fetch_revisions(self):
87
91
"""Test fetch-revision operation."""
88
get_transport(self.get_url()).mkdir('b1')
89
get_transport(self.get_url()).mkdir('b2')
90
92
wt = self.make_branch_and_tree('b1')
92
b2 = self.make_branch('b2')
93
file('b1/foo', 'w').write('hello')
94
self.build_tree_contents([('b1/foo', 'hello')])
94
95
wt.add(['foo'], ['foo-id'])
95
96
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
98
b2 = self.make_branch('b2')
98
99
self.assertEqual((1, []), b2.fetch(b1))
100
101
rev = b2.repository.get_revision('revision-1')
101
102
tree = b2.repository.revision_tree('revision-1')
104
self.addCleanup(tree.unlock)
102
105
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
107
def test_get_revision_delta(self):
108
tree_a = self.make_branch_and_tree('a')
109
self.build_tree(['a/foo'])
110
tree_a.add('foo', 'file1')
111
tree_a.commit('rev1', rev_id='rev1')
112
self.build_tree(['a/vla'])
113
tree_a.add('vla', 'file2')
114
tree_a.commit('rev2', rev_id='rev2')
116
delta = tree_a.branch.get_revision_delta(1)
117
self.assertIsInstance(delta, TreeDelta)
118
self.assertEqual([('foo', 'file1', 'file')], delta.added)
119
delta = tree_a.branch.get_revision_delta(2)
120
self.assertIsInstance(delta, TreeDelta)
121
self.assertEqual([('vla', 'file2', 'file')], delta.added)
104
123
def get_unbalanced_tree_pair(self):
105
124
"""Return two branches, a and b, with one file in a."""
106
get_transport(self.get_url()).mkdir('a')
107
125
tree_a = self.make_branch_and_tree('a')
108
file('a/b', 'wb').write('b')
126
self.build_tree_contents([('a/b', 'b')])
110
128
tree_a.commit("silly commit", rev_id='A')
112
get_transport(self.get_url()).mkdir('b')
113
130
tree_b = self.make_branch_and_tree('b')
114
131
return tree_a, tree_b
119
136
tree_b.branch.repository.fetch(tree_a.branch.repository)
120
137
return tree_a, tree_b
122
def test_clone_branch(self):
123
"""Copy the stores from one branch to another"""
124
tree_a, tree_b = self.get_balanced_branch_pair()
125
tree_b.commit("silly commit")
127
# this fails to test that the history from a was not used.
128
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
self.assertEqual(tree_a.branch.revision_history(),
130
dir_c.open_branch().revision_history())
132
139
def test_clone_partial(self):
133
140
"""Copy only part of the history of a branch."""
134
141
# TODO: RBC 20060208 test with a revision not on revision-history.
135
142
# what should that behaviour be ? Emailed the list.
136
wt_a = self.make_branch_and_tree('a')
137
self.build_tree(['a/one'])
139
wt_a.commit('commit one', rev_id='1')
140
self.build_tree(['a/two'])
142
wt_a.commit('commit two', rev_id='2')
143
repo_b = self.make_repository('b')
144
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
self.assertEqual(br_b.last_revision(), '1')
148
def test_sprout_partial(self):
149
# test sprouting with a prefix of the revision-history.
150
# also needs not-on-revision-history behaviour defined.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
155
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
repo_b = self.make_repository('b')
159
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
self.assertEqual(br_b.last_revision(), '1')
143
# First, make a branch with two commits.
144
wt_a = self.make_branch_and_tree('a')
145
self.build_tree(['a/one'])
147
wt_a.commit('commit one', rev_id='1')
148
self.build_tree(['a/two'])
150
wt_a.commit('commit two', rev_id='2')
151
# Now make a copy of the repository.
152
repo_b = self.make_repository('b')
153
wt_a.branch.repository.copy_content_into(repo_b)
154
# wt_a might be a lightweight checkout, so get a hold of the actual
155
# branch (because you can't do a partial clone of a lightweight
157
branch = wt_a.branch.bzrdir.open_branch()
158
# Then make a branch where the new repository is, but specify a revision
159
# ID. The new branch's history will stop at the specified revision.
160
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
161
self.assertEqual('1', br_b.last_revision())
163
def get_parented_branch(self):
164
wt_a = self.make_branch_and_tree('a')
165
self.build_tree(['a/one'])
167
wt_a.commit('commit one', rev_id='1')
169
branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
170
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
163
173
def test_clone_branch_nickname(self):
164
174
# test the nick name is preserved always
165
raise TestSkipped('XXX branch cloning is not yet tested..')
175
raise TestSkipped('XXX branch cloning is not yet tested.')
167
177
def test_clone_branch_parent(self):
168
178
# test the parent is preserved always
169
raise TestSkipped('XXX branch cloning is not yet tested..')
171
def test_sprout_branch_nickname(self):
172
# test the nick name is reset always
173
raise TestSkipped('XXX branch sprouting is not yet tested..')
175
def test_sprout_branch_parent(self):
176
source = self.make_branch('source')
177
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
180
def test_record_initial_ghost_merge(self):
181
"""A pending merge with no revision present is still a merge."""
179
branch_b = self.get_parented_branch()
180
repo_c = self.make_repository('c')
181
branch_b.repository.copy_content_into(repo_c)
182
branch_c = branch_b.clone(repo_c.bzrdir)
183
self.assertNotEqual(None, branch_c.get_parent())
184
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
186
# We can also set a specific parent, and it should be honored
187
random_parent = 'http://bazaar-vcs.org/path/to/branch'
188
branch_b.set_parent(random_parent)
189
repo_d = self.make_repository('d')
190
branch_b.repository.copy_content_into(repo_d)
191
branch_d = branch_b.clone(repo_d.bzrdir)
192
self.assertEqual(random_parent, branch_d.get_parent())
194
def test_submit_branch(self):
195
"""Submit location can be queried and set"""
196
branch = self.make_branch('branch')
197
self.assertEqual(branch.get_submit_branch(), None)
198
branch.set_submit_branch('sftp://example.com')
199
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
200
branch.set_submit_branch('sftp://example.net')
201
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
203
def test_public_branch(self):
204
"""public location can be queried and set"""
205
branch = self.make_branch('branch')
206
self.assertEqual(branch.get_public_branch(), None)
207
branch.set_public_branch('sftp://example.com')
208
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
209
branch.set_public_branch('sftp://example.net')
210
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
211
branch.set_public_branch(None)
212
self.assertEqual(branch.get_public_branch(), None)
214
def test_record_initial_ghost(self):
215
"""Branches should support having ghosts."""
182
216
wt = self.make_branch_and_tree('.')
184
wt.add_pending_merge('non:existent@rev--ision--0--2')
185
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
rev = branch.repository.get_revision(branch.last_revision())
187
self.assertEqual(len(rev.parent_ids), 1)
217
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
218
allow_leftmost_as_ghost=True)
219
self.assertEqual(['non:existent@rev--ision--0--2'],
221
rev_id = wt.commit('commit against a ghost first parent.')
222
rev = wt.branch.repository.get_revision(rev_id)
223
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
188
224
# parent_sha1s is not populated now, WTF. rbc 20051003
189
225
self.assertEqual(len(rev.parent_sha1s), 0)
190
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
227
def test_record_two_ghosts(self):
228
"""Recording with all ghosts works."""
229
wt = self.make_branch_and_tree('.')
231
'foo@azkhazan-123123-abcabc',
232
'wibble@fofof--20050401--1928390812',
234
allow_leftmost_as_ghost=True)
235
rev_id = wt.commit("commit from ghost base with one merge")
236
# the revision should have been committed with two parents
237
rev = wt.branch.repository.get_revision(rev_id)
238
self.assertEqual(['foo@azkhazan-123123-abcabc',
239
'wibble@fofof--20050401--1928390812'],
192
242
def test_bad_revision(self):
193
243
self.assertRaises(errors.InvalidRevisionId,
199
249
# an identical tree without a ghost
200
250
# fetch missing should rewrite the TOC of weaves to list newly available parents.
202
def test_pending_merges(self):
203
"""Tracking pending-merged revisions."""
204
wt = self.make_branch_and_tree('.')
206
self.assertEquals(wt.pending_merges(), [])
207
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
self.assertEquals(wt.pending_merges(),
213
['foo@azkhazan-123123-abcabc',
214
'wibble@fofof--20050401--1928390812'])
215
wt.commit("commit from base with two merges")
216
rev = b.repository.get_revision(b.revision_history()[0])
217
self.assertEquals(len(rev.parent_ids), 2)
218
self.assertEquals(rev.parent_ids[0],
219
'foo@azkhazan-123123-abcabc')
220
self.assertEquals(rev.parent_ids[1],
221
'wibble@fofof--20050401--1928390812')
222
# list should be cleared when we do a commit
223
self.assertEquals(wt.pending_merges(), [])
225
252
def test_sign_existing_revision(self):
226
253
wt = self.make_branch_and_tree('.')
227
254
branch = wt.branch
228
255
wt.commit("base", allow_pointless=True, rev_id='A')
229
256
from bzrlib.testament import Testament
230
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
257
strategy = gpg.LoopbackGPGStrategy(None)
258
branch.repository.lock_write()
259
branch.repository.start_write_group()
231
260
branch.repository.sign_revision('A', strategy)
232
self.assertEqual(Testament.from_revision(branch.repository,
233
'A').as_short_text(),
261
branch.repository.commit_write_group()
262
branch.repository.unlock()
263
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
264
Testament.from_revision(branch.repository,
265
'A').as_short_text() +
266
'-----END PSEUDO-SIGNED CONTENT-----\n',
234
267
branch.repository.get_signature_text('A'))
236
269
def test_store_signature(self):
237
270
wt = self.make_branch_and_tree('.')
238
271
branch = wt.branch
239
branch.repository.store_revision_signature(
240
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
274
branch.repository.start_write_group()
276
branch.repository.store_revision_signature(
277
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
279
branch.repository.abort_write_group()
282
branch.repository.commit_write_group()
285
# A signature without a revision should not be accessible.
241
286
self.assertRaises(errors.NoSuchRevision,
242
287
branch.repository.has_signature_for_revision_id,
244
289
wt.commit("base", allow_pointless=True, rev_id='A')
245
self.assertEqual('FOO',
290
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
291
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
246
292
branch.repository.get_signature_text('A'))
248
294
def test_branch_keeps_signatures(self):
249
295
wt = self.make_branch_and_tree('source')
250
296
wt.commit('A', allow_pointless=True, rev_id='A')
251
wt.branch.repository.sign_revision('A',
252
bzrlib.gpg.LoopbackGPGStrategy(None))
297
repo = wt.branch.repository
299
repo.start_write_group()
300
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
301
repo.commit_write_group()
253
303
#FIXME: clone should work to urls,
254
304
# wt.clone should work to disks.
255
305
self.build_tree(['target/'])
256
d2 = wt.bzrdir.clone('target')
257
self.assertEqual(wt.branch.repository.get_signature_text('A'),
306
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
307
self.assertEqual(repo.get_signature_text('A'),
258
308
d2.open_repository().get_signature_text('A'))
310
def test_missing_revisions(self):
311
t1 = self.make_branch_and_tree('b1')
312
rev1 = t1.commit('one')
313
t2 = t1.bzrdir.sprout('b2').open_workingtree()
314
rev2 = t1.commit('two')
315
rev3 = t1.commit('three')
317
self.assertEqual([rev2, rev3],
318
self.applyDeprecated(deprecated_in((1, 6, 0)),
319
t2.branch.missing_revisions, t1.branch))
322
self.applyDeprecated(deprecated_in((1, 6, 0)),
323
t2.branch.missing_revisions, t1.branch, stop_revision=1))
324
self.assertEqual([rev2],
325
self.applyDeprecated(deprecated_in((1, 6, 0)),
326
t2.branch.missing_revisions, t1.branch, stop_revision=2))
327
self.assertEqual([rev2, rev3],
328
self.applyDeprecated(deprecated_in((1, 6, 0)),
329
t2.branch.missing_revisions, t1.branch, stop_revision=3))
331
self.assertRaises(errors.NoSuchRevision,
332
self.applyDeprecated, deprecated_in((1, 6, 0)),
333
t2.branch.missing_revisions, t1.branch, stop_revision=4)
335
rev4 = t2.commit('four')
336
self.assertRaises(errors.DivergedBranches,
337
self.applyDeprecated, deprecated_in((1, 6, 0)),
338
t2.branch.missing_revisions, t1.branch)
260
340
def test_nicks(self):
261
"""Branch nicknames"""
341
"""Test explicit and implicit branch nicknames.
343
Nicknames are implicitly the name of the branch's directory, unless an
344
explicit nickname is set. That is, an explicit nickname always
345
overrides the implicit one.
262
347
t = get_transport(self.get_url())
264
348
branch = self.make_branch('bzr.dev')
349
# The nick will be 'bzr.dev', because there is no explicit nick set.
265
350
self.assertEqual(branch.nick, 'bzr.dev')
351
# Move the branch to a different directory, 'bzr.ab'. Now that branch
352
# will report its nick as 'bzr.ab'.
266
353
t.move('bzr.dev', 'bzr.ab')
267
354
branch = Branch.open(self.get_url('bzr.ab'))
268
355
self.assertEqual(branch.nick, 'bzr.ab')
269
branch.nick = "Aaron's branch"
270
branch.nick = "Aaron's branch"
274
branch.control_files.controlfilename("branch.conf")
356
# Set the branch nick explicitly. This will ensure there's a branch
357
# config file in the branch.
358
branch.nick = "Aaron's branch"
359
if not isinstance(branch, remote.RemoteBranch):
360
self.failUnless(branch._transport.has("branch.conf"))
361
# Because the nick has been set explicitly, the nick is now always
362
# "Aaron's branch", regardless of directory name.
278
363
self.assertEqual(branch.nick, "Aaron's branch")
279
364
t.move('bzr.ab', 'integration')
280
365
branch = Branch.open(self.get_url('integration'))
317
402
self.failUnless(len(text))
319
404
def test_get_commit_builder(self):
320
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
321
bzrlib.repository.CommitBuilder)
405
branch = self.make_branch(".")
407
builder = branch.get_commit_builder([])
408
self.assertIsInstance(builder, repository.CommitBuilder)
409
branch.repository.commit_write_group()
412
def test_generate_revision_history(self):
413
"""Create a fake revision history easily."""
414
tree = self.make_branch_and_tree('.')
415
rev1 = tree.commit('foo')
416
orig_history = tree.branch.revision_history()
417
rev2 = tree.commit('bar', allow_pointless=True)
418
tree.branch.generate_revision_history(rev1)
419
self.assertEqual(orig_history, tree.branch.revision_history())
421
def test_generate_revision_history_NULL_REVISION(self):
422
tree = self.make_branch_and_tree('.')
423
rev1 = tree.commit('foo')
424
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
425
self.assertEqual([], tree.branch.revision_history())
427
def test_create_checkout(self):
428
tree_a = self.make_branch_and_tree('a')
429
branch_a = tree_a.branch
430
checkout_b = branch_a.create_checkout('b')
431
self.assertEqual('null:', checkout_b.last_revision())
432
checkout_b.commit('rev1', rev_id='rev1')
433
self.assertEqual('rev1', branch_a.last_revision())
434
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
436
checkout_c = branch_a.create_checkout('c', lightweight=True)
437
self.assertEqual('rev1', checkout_c.last_revision())
438
checkout_c.commit('rev2', rev_id='rev2')
439
self.assertEqual('rev2', branch_a.last_revision())
440
self.assertEqual(checkout_c.branch.base, branch_a.base)
443
checkout_d = branch_a.create_checkout('d', lightweight=True)
444
self.assertEqual('rev2', checkout_d.last_revision())
446
checkout_e = branch_a.create_checkout('e')
447
self.assertEqual('rev2', checkout_e.last_revision())
449
def test_create_anonymous_lightweight_checkout(self):
450
"""A lightweight checkout from a readonly branch should succeed."""
451
tree_a = self.make_branch_and_tree('a')
452
rev_id = tree_a.commit('put some content in the branch')
453
# open the branch via a readonly transport
454
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
455
# sanity check that the test will be valid
456
self.assertRaises((errors.LockError, errors.TransportNotPossible),
457
source_branch.lock_write)
458
checkout = source_branch.create_checkout('c', lightweight=True)
459
self.assertEqual(rev_id, checkout.last_revision())
461
def test_create_anonymous_heavyweight_checkout(self):
462
"""A regular checkout from a readonly branch should succeed."""
463
tree_a = self.make_branch_and_tree('a')
464
rev_id = tree_a.commit('put some content in the branch')
465
# open the branch via a readonly transport
466
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
467
# sanity check that the test will be valid
468
self.assertRaises((errors.LockError, errors.TransportNotPossible),
469
source_branch.lock_write)
470
checkout = source_branch.create_checkout('c')
471
self.assertEqual(rev_id, checkout.last_revision())
473
def test_set_revision_history(self):
474
tree = self.make_branch_and_tree('a')
475
tree.commit('a commit', rev_id='rev1')
477
br.set_revision_history(["rev1"])
478
self.assertEquals(br.revision_history(), ["rev1"])
479
br.set_revision_history([])
480
self.assertEquals(br.revision_history(), [])
324
483
class ChrootedTests(TestCaseWithBranch):
427
568
self.assertEqual(['lw', 'ul'], branch._calls)
430
class TestBranchTransaction(TestCaseWithBranch):
433
super(TestBranchTransaction, self).setUp()
436
def test_default_get_transaction(self):
437
"""branch.get_transaction on a new branch should give a PassThrough."""
438
self.failUnless(isinstance(self.get_branch().get_transaction(),
439
transactions.PassThroughTransaction))
441
def test__set_new_transaction(self):
442
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
444
def test__set_over_existing_transaction_raises(self):
445
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
446
self.assertRaises(errors.LockError,
447
self.get_branch()._set_transaction,
448
transactions.ReadOnlyTransaction())
450
def test_finish_no_transaction_raises(self):
451
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
453
def test_finish_readonly_transaction_works(self):
454
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
455
self.get_branch()._finish_transaction()
456
self.assertEqual(None, self.get_branch().control_files._transaction)
458
def test_unlock_calls_finish(self):
459
self.get_branch().lock_read()
460
transaction = InstrumentedTransaction()
461
self.get_branch().control_files._transaction = transaction
462
self.get_branch().unlock()
463
self.assertEqual(['finish'], transaction.calls)
465
def test_lock_read_acquires_ro_transaction(self):
466
self.get_branch().lock_read()
467
self.failUnless(isinstance(self.get_branch().get_transaction(),
468
transactions.ReadOnlyTransaction))
469
self.get_branch().unlock()
471
def test_lock_write_acquires_write_transaction(self):
472
self.get_branch().lock_write()
473
# cannot use get_transaction as its magic
474
self.failUnless(isinstance(self.get_branch().control_files._transaction,
475
transactions.WriteTransaction))
476
self.get_branch().unlock()
479
571
class TestBranchPushLocations(TestCaseWithBranch):
481
573
def test_get_push_location_unset(self):
482
574
self.assertEqual(None, self.get_branch().get_push_location())
484
576
def test_get_push_location_exact(self):
485
from bzrlib.config import (branches_config_filename,
577
from bzrlib.config import (locations_config_filename,
486
578
ensure_config_dir_exists)
487
579
ensure_config_dir_exists()
488
fn = branches_config_filename()
489
print >> open(fn, 'wt'), ("[%s]\n"
490
"push_location=foo" %
491
self.get_branch().base[:-1])
580
fn = locations_config_filename()
581
open(fn, 'wt').write(("[%s]\n"
582
"push_location=foo\n" %
583
self.get_branch().base[:-1]))
492
584
self.assertEqual("foo", self.get_branch().get_push_location())
494
586
def test_set_push_location(self):
495
from bzrlib.config import (branches_config_filename,
496
ensure_config_dir_exists)
497
ensure_config_dir_exists()
498
fn = branches_config_filename()
499
self.get_branch().set_push_location('foo')
500
self.assertFileEqual("[%s]\n"
501
"push_location = foo" % self.get_branch().base[:-1],
504
# TODO RBC 20051029 test getting a push location from a branch in a
505
# recursive section - that is, it appends the branch name.
587
branch = self.get_branch()
588
branch.set_push_location('foo')
589
self.assertEqual('foo', branch.get_push_location())
508
592
class TestFormat(TestCaseWithBranch):
509
593
"""Tests for the format itself."""
595
def test_get_reference(self):
596
"""get_reference on all regular branches should return None."""
597
if not self.branch_format.is_supported():
598
# unsupported formats are not loopback testable
599
# because the default open will not open them and
600
# they may not be initializable.
602
made_branch = self.make_branch('.')
603
self.assertEqual(None,
604
made_branch._format.get_reference(made_branch.bzrdir))
606
def test_set_reference(self):
607
"""set_reference on all regular branches should be callable."""
608
if not self.branch_format.is_supported():
609
# unsupported formats are not loopback testable
610
# because the default open will not open them and
611
# they may not be initializable.
613
this_branch = self.make_branch('this')
614
other_branch = self.make_branch('other')
616
this_branch._format.set_reference(this_branch.bzrdir, other_branch)
617
except NotImplementedError:
621
ref = this_branch._format.get_reference(this_branch.bzrdir)
622
self.assertEqual(ref, other_branch.base)
511
624
def test_format_initialize_find_open(self):
512
625
# loopback test to check the current format initializes to itself.
513
626
if not self.branch_format.is_supported():
540
653
except NotImplementedError:
542
655
self.assertEqual(self.branch_format,
543
bzrlib.branch.BranchFormat.find_format(opened_control))
656
opened_control.find_branch_format())
659
class TestBound(TestCaseWithBranch):
661
def test_bind_unbind(self):
662
branch = self.make_branch('1')
663
branch2 = self.make_branch('2')
666
except errors.UpgradeRequired:
667
raise tests.TestNotApplicable('Format does not support binding')
668
self.assertTrue(branch.unbind())
669
self.assertFalse(branch.unbind())
670
self.assertIs(None, branch.get_bound_location())
672
def test_old_bound_location(self):
673
branch = self.make_branch('branch1')
675
self.assertIs(None, branch.get_old_bound_location())
676
except errors.UpgradeRequired:
677
raise tests.TestNotApplicable(
678
'Format does not store old bound locations')
679
branch2 = self.make_branch('branch2')
681
self.assertIs(None, branch.get_old_bound_location())
683
self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
685
def test_bind_diverged(self):
686
tree_a = self.make_branch_and_tree('tree_a')
687
tree_a.commit('rev1a')
688
tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
689
tree_a.commit('rev2a')
690
tree_b.commit('rev2b')
692
tree_b.branch.bind(tree_a.branch)
693
except errors.UpgradeRequired:
694
raise tests.TestNotApplicable('Format does not support binding')
697
class TestStrict(TestCaseWithBranch):
699
def test_strict_history(self):
700
tree1 = self.make_branch_and_tree('tree1')
702
tree1.branch.set_append_revisions_only(True)
703
except errors.UpgradeRequired:
704
raise TestSkipped('Format does not support strict history')
705
tree1.commit('empty commit')
706
tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
707
tree2.commit('empty commit 2')
708
tree1.pull(tree2.branch)
709
tree1.commit('empty commit 3')
710
tree2.commit('empty commit 4')
711
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
712
tree2.merge_from_branch(tree1.branch)
713
tree2.commit('empty commit 5')
714
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
716
tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
717
tree3.merge_from_branch(tree2.branch)
718
tree3.commit('empty commit 6')
719
tree2.pull(tree3.branch)