39
41
from bzrlib.osutils import getcwd
40
42
import bzrlib.revision
43
from bzrlib.symbol_versioning import deprecated_in
41
44
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
45
from bzrlib.tests.branch_implementations import TestCaseWithBranch
46
from bzrlib.tests.http_server import HttpServer
43
47
from bzrlib.trace import mutter
44
48
from bzrlib.transport import get_transport
45
from bzrlib.transport.http import HttpServer
46
49
from bzrlib.transport.memory import MemoryServer
47
50
from bzrlib.upgrade import upgrade
48
51
from bzrlib.workingtree import WorkingTree
51
# TODO: Make a branch using basis branch, and check that it
52
# doesn't request any files that could have been avoided, by
53
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
54
class TestBranch(TestCaseWithBranch):
83
def test_append_revisions(self):
84
"""Test appending more than one revision"""
56
def test_create_tree_with_merge(self):
57
tree = self.create_tree_with_merge()
59
self.addCleanup(tree.unlock)
60
graph = tree.branch.repository.get_graph()
61
ancestry_graph = graph.get_parent_map(
62
tree.branch.repository.all_revision_ids())
63
self.assertEqual({'rev-1':('null:',),
65
'rev-1.1.1':('rev-1', ),
66
'rev-3':('rev-2', 'rev-1.1.1', ),
69
def test_revision_ids_are_utf8(self):
70
wt = self.make_branch_and_tree('tree')
71
wt.commit('f', rev_id='rev1')
72
wt.commit('f', rev_id='rev2')
73
wt.commit('f', rev_id='rev3')
85
75
br = self.get_branch()
86
br.append_revision("rev1")
87
self.assertEquals(br.revision_history(), ["rev1",])
88
br.append_revision("rev2", "rev3")
89
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
77
br.set_revision_history(['rev1', 'rev2', 'rev3'])
78
rh = br.revision_history()
79
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
80
for revision_id in rh:
81
self.assertIsInstance(revision_id, str)
82
last = br.last_revision()
83
self.assertEqual('rev3', last)
84
self.assertIsInstance(last, str)
85
revno, last = br.last_revision_info()
86
self.assertEqual(3, revno)
87
self.assertEqual('rev3', last)
88
self.assertIsInstance(last, str)
91
90
def test_fetch_revisions(self):
92
91
"""Test fetch-revision operation."""
93
get_transport(self.get_url()).mkdir('b1')
94
get_transport(self.get_url()).mkdir('b2')
95
92
wt = self.make_branch_and_tree('b1')
97
b2 = self.make_branch('b2')
98
file('b1/foo', 'w').write('hello')
94
self.build_tree_contents([('b1/foo', 'hello')])
99
95
wt.add(['foo'], ['foo-id'])
100
96
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
102
mutter('start fetch')
98
b2 = self.make_branch('b2')
103
99
self.assertEqual((1, []), b2.fetch(b1))
105
101
rev = b2.repository.get_revision('revision-1')
106
102
tree = b2.repository.revision_tree('revision-1')
104
self.addCleanup(tree.unlock)
107
105
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
109
107
def test_get_revision_delta(self):
140
136
tree_b.branch.repository.fetch(tree_a.branch.repository)
141
137
return tree_a, tree_b
143
def test_clone_branch(self):
144
"""Copy the stores from one branch to another"""
145
tree_a, tree_b = self.get_balanced_branch_pair()
146
tree_b.commit("silly commit")
148
# this fails to test that the history from a was not used.
149
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
self.assertEqual(tree_a.branch.revision_history(),
151
dir_c.open_branch().revision_history())
153
139
def test_clone_partial(self):
154
140
"""Copy only part of the history of a branch."""
155
141
# TODO: RBC 20060208 test with a revision not on revision-history.
156
142
# what should that behaviour be ? Emailed the list.
157
wt_a = self.make_branch_and_tree('a')
158
self.build_tree(['a/one'])
160
wt_a.commit('commit one', rev_id='1')
161
self.build_tree(['a/two'])
163
wt_a.commit('commit two', rev_id='2')
164
repo_b = self.make_repository('b')
165
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
self.assertEqual('1', br_b.last_revision())
169
def test_sprout_partial(self):
170
# test sprouting with a prefix of the revision-history.
171
# also needs not-on-revision-history behaviour defined.
172
wt_a = self.make_branch_and_tree('a')
173
self.build_tree(['a/one'])
175
wt_a.commit('commit one', rev_id='1')
176
self.build_tree(['a/two'])
178
wt_a.commit('commit two', rev_id='2')
179
repo_b = self.make_repository('b')
180
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
143
# First, make a branch with two commits.
144
wt_a = self.make_branch_and_tree('a')
145
self.build_tree(['a/one'])
147
wt_a.commit('commit one', rev_id='1')
148
self.build_tree(['a/two'])
150
wt_a.commit('commit two', rev_id='2')
151
# Now make a copy of the repository.
152
repo_b = self.make_repository('b')
153
wt_a.branch.repository.copy_content_into(repo_b)
154
# wt_a might be a lightweight checkout, so get a hold of the actual
155
# branch (because you can't do a partial clone of a lightweight
157
branch = wt_a.branch.bzrdir.open_branch()
158
# Then make a branch where the new repository is, but specify a revision
159
# ID. The new branch's history will stop at the specified revision.
160
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
182
161
self.assertEqual('1', br_b.last_revision())
184
163
def get_parented_branch(self):
230
200
branch.set_submit_branch('sftp://example.net')
231
201
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
203
def test_public_branch(self):
204
"""public location can be queried and set"""
205
branch = self.make_branch('branch')
206
self.assertEqual(branch.get_public_branch(), None)
207
branch.set_public_branch('sftp://example.com')
208
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
209
branch.set_public_branch('sftp://example.net')
210
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
211
branch.set_public_branch(None)
212
self.assertEqual(branch.get_public_branch(), None)
233
214
def test_record_initial_ghost(self):
234
215
"""Branches should support having ghosts."""
235
216
wt = self.make_branch_and_tree('.')
236
217
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
237
218
allow_leftmost_as_ghost=True)
219
self.assertEqual(['non:existent@rev--ision--0--2'],
238
221
rev_id = wt.commit('commit against a ghost first parent.')
239
222
rev = wt.branch.repository.get_revision(rev_id)
240
223
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
272
255
wt.commit("base", allow_pointless=True, rev_id='A')
273
256
from bzrlib.testament import Testament
274
257
strategy = gpg.LoopbackGPGStrategy(None)
258
branch.repository.lock_write()
259
branch.repository.start_write_group()
275
260
branch.repository.sign_revision('A', strategy)
276
self.assertEqual(Testament.from_revision(branch.repository,
277
'A').as_short_text(),
261
branch.repository.commit_write_group()
262
branch.repository.unlock()
263
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
264
Testament.from_revision(branch.repository,
265
'A').as_short_text() +
266
'-----END PSEUDO-SIGNED CONTENT-----\n',
278
267
branch.repository.get_signature_text('A'))
280
269
def test_store_signature(self):
281
270
wt = self.make_branch_and_tree('.')
282
271
branch = wt.branch
283
branch.repository.store_revision_signature(
284
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
274
branch.repository.start_write_group()
276
branch.repository.store_revision_signature(
277
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
279
branch.repository.abort_write_group()
282
branch.repository.commit_write_group()
285
# A signature without a revision should not be accessible.
285
286
self.assertRaises(errors.NoSuchRevision,
286
287
branch.repository.has_signature_for_revision_id,
288
289
wt.commit("base", allow_pointless=True, rev_id='A')
289
self.assertEqual('FOO',
290
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
291
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
290
292
branch.repository.get_signature_text('A'))
292
294
def test_branch_keeps_signatures(self):
293
295
wt = self.make_branch_and_tree('source')
294
296
wt.commit('A', allow_pointless=True, rev_id='A')
295
wt.branch.repository.sign_revision('A',
296
gpg.LoopbackGPGStrategy(None))
297
repo = wt.branch.repository
299
repo.start_write_group()
300
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
301
repo.commit_write_group()
297
303
#FIXME: clone should work to urls,
298
304
# wt.clone should work to disks.
299
305
self.build_tree(['target/'])
300
d2 = wt.bzrdir.clone('target')
301
self.assertEqual(wt.branch.repository.get_signature_text('A'),
306
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
307
self.assertEqual(repo.get_signature_text('A'),
302
308
d2.open_repository().get_signature_text('A'))
310
def test_missing_revisions(self):
311
t1 = self.make_branch_and_tree('b1')
312
rev1 = t1.commit('one')
313
t2 = t1.bzrdir.sprout('b2').open_workingtree()
314
rev2 = t1.commit('two')
315
rev3 = t1.commit('three')
317
self.assertEqual([rev2, rev3],
318
self.applyDeprecated(deprecated_in((1, 6, 0)),
319
t2.branch.missing_revisions, t1.branch))
322
self.applyDeprecated(deprecated_in((1, 6, 0)),
323
t2.branch.missing_revisions, t1.branch, stop_revision=1))
324
self.assertEqual([rev2],
325
self.applyDeprecated(deprecated_in((1, 6, 0)),
326
t2.branch.missing_revisions, t1.branch, stop_revision=2))
327
self.assertEqual([rev2, rev3],
328
self.applyDeprecated(deprecated_in((1, 6, 0)),
329
t2.branch.missing_revisions, t1.branch, stop_revision=3))
331
self.assertRaises(errors.NoSuchRevision,
332
self.applyDeprecated, deprecated_in((1, 6, 0)),
333
t2.branch.missing_revisions, t1.branch, stop_revision=4)
335
rev4 = t2.commit('four')
336
self.assertRaises(errors.DivergedBranches,
337
self.applyDeprecated, deprecated_in((1, 6, 0)),
338
t2.branch.missing_revisions, t1.branch)
304
340
def test_nicks(self):
305
"""Branch nicknames"""
341
"""Test explicit and implicit branch nicknames.
343
Nicknames are implicitly the name of the branch's directory, unless an
344
explicit nickname is set. That is, an explicit nickname always
345
overrides the implicit one.
306
347
t = get_transport(self.get_url())
308
348
branch = self.make_branch('bzr.dev')
349
# The nick will be 'bzr.dev', because there is no explicit nick set.
309
350
self.assertEqual(branch.nick, 'bzr.dev')
351
# Move the branch to a different directory, 'bzr.ab'. Now that branch
352
# will report its nick as 'bzr.ab'.
310
353
t.move('bzr.dev', 'bzr.ab')
311
354
branch = Branch.open(self.get_url('bzr.ab'))
312
355
self.assertEqual(branch.nick, 'bzr.ab')
313
branch.nick = "Aaron's branch"
314
branch.nick = "Aaron's branch"
318
branch.control_files.controlfilename("branch.conf")
356
# Set the branch nick explicitly. This will ensure there's a branch
357
# config file in the branch.
358
branch.nick = "Aaron's branch"
359
if not isinstance(branch, remote.RemoteBranch):
360
self.failUnless(branch._transport.has("branch.conf"))
361
# Because the nick has been set explicitly, the nick is now always
362
# "Aaron's branch", regardless of directory name.
322
363
self.assertEqual(branch.nick, "Aaron's branch")
323
364
t.move('bzr.ab', 'integration')
324
365
branch = Branch.open(self.get_url('integration'))
391
428
tree_a = self.make_branch_and_tree('a')
392
429
branch_a = tree_a.branch
393
430
checkout_b = branch_a.create_checkout('b')
431
self.assertEqual('null:', checkout_b.last_revision())
394
432
checkout_b.commit('rev1', rev_id='rev1')
395
433
self.assertEqual('rev1', branch_a.last_revision())
396
434
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
398
436
checkout_c = branch_a.create_checkout('c', lightweight=True)
437
self.assertEqual('rev1', checkout_c.last_revision())
399
438
checkout_c.commit('rev2', rev_id='rev2')
400
439
self.assertEqual('rev2', branch_a.last_revision())
401
440
self.assertEqual(checkout_c.branch.base, branch_a.base)
404
443
checkout_d = branch_a.create_checkout('d', lightweight=True)
444
self.assertEqual('rev2', checkout_d.last_revision())
406
446
checkout_e = branch_a.create_checkout('e')
447
self.assertEqual('rev2', checkout_e.last_revision())
408
449
def test_create_anonymous_lightweight_checkout(self):
409
"""A checkout from a readonly branch should succeed."""
450
"""A lightweight checkout from a readonly branch should succeed."""
410
451
tree_a = self.make_branch_and_tree('a')
411
452
rev_id = tree_a.commit('put some content in the branch')
412
source_branch = bzrlib.branch.Branch.open(
413
'readonly+' + tree_a.bzrdir.root_transport.base)
453
# open the branch via a readonly transport
454
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
414
455
# sanity check that the test will be valid
415
456
self.assertRaises((errors.LockError, errors.TransportNotPossible),
416
457
source_branch.lock_write)
417
458
checkout = source_branch.create_checkout('c', lightweight=True)
418
459
self.assertEqual(rev_id, checkout.last_revision())
461
def test_create_anonymous_heavyweight_checkout(self):
462
"""A regular checkout from a readonly branch should succeed."""
463
tree_a = self.make_branch_and_tree('a')
464
rev_id = tree_a.commit('put some content in the branch')
465
# open the branch via a readonly transport
466
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
467
# sanity check that the test will be valid
468
self.assertRaises((errors.LockError, errors.TransportNotPossible),
469
source_branch.lock_write)
470
checkout = source_branch.create_checkout('c')
471
self.assertEqual(rev_id, checkout.last_revision())
473
def test_set_revision_history(self):
474
tree = self.make_branch_and_tree('a')
475
tree.commit('a commit', rev_id='rev1')
477
br.set_revision_history(["rev1"])
478
self.assertEquals(br.revision_history(), ["rev1"])
479
br.set_revision_history([])
480
self.assertEquals(br.revision_history(), [])
421
483
class ChrootedTests(TestCaseWithBranch):
422
484
"""A support class that provides readonly urls outside the local namespace.
506
568
self.assertEqual(['lw', 'ul'], branch._calls)
509
class TestBranchTransaction(TestCaseWithBranch):
512
super(TestBranchTransaction, self).setUp()
515
def test_default_get_transaction(self):
516
"""branch.get_transaction on a new branch should give a PassThrough."""
517
self.failUnless(isinstance(self.get_branch().get_transaction(),
518
transactions.PassThroughTransaction))
520
def test__set_new_transaction(self):
521
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
523
def test__set_over_existing_transaction_raises(self):
524
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
525
self.assertRaises(errors.LockError,
526
self.get_branch()._set_transaction,
527
transactions.ReadOnlyTransaction())
529
def test_finish_no_transaction_raises(self):
530
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
532
def test_finish_readonly_transaction_works(self):
533
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
534
self.get_branch()._finish_transaction()
535
self.assertEqual(None, self.get_branch().control_files._transaction)
537
def test_unlock_calls_finish(self):
538
self.get_branch().lock_read()
539
transaction = InstrumentedTransaction()
540
self.get_branch().control_files._transaction = transaction
541
self.get_branch().unlock()
542
self.assertEqual(['finish'], transaction.calls)
544
def test_lock_read_acquires_ro_transaction(self):
545
self.get_branch().lock_read()
546
self.failUnless(isinstance(self.get_branch().get_transaction(),
547
transactions.ReadOnlyTransaction))
548
self.get_branch().unlock()
550
def test_lock_write_acquires_write_transaction(self):
551
self.get_branch().lock_write()
552
# cannot use get_transaction as its magic
553
self.failUnless(isinstance(self.get_branch().control_files._transaction,
554
transactions.WriteTransaction))
555
self.get_branch().unlock()
558
571
class TestBranchPushLocations(TestCaseWithBranch):
560
573
def test_get_push_location_unset(self):
565
578
ensure_config_dir_exists)
566
579
ensure_config_dir_exists()
567
580
fn = locations_config_filename()
568
print >> open(fn, 'wt'), ("[%s]\n"
569
"push_location=foo" %
570
self.get_branch().base[:-1])
581
open(fn, 'wt').write(("[%s]\n"
582
"push_location=foo\n" %
583
self.get_branch().base[:-1]))
571
584
self.assertEqual("foo", self.get_branch().get_push_location())
573
586
def test_set_push_location(self):
574
from bzrlib.config import (locations_config_filename,
575
ensure_config_dir_exists)
576
ensure_config_dir_exists()
577
fn = locations_config_filename()
578
587
branch = self.get_branch()
579
588
branch.set_push_location('foo')
580
local_path = urlutils.local_path_from_url(branch.base[:-1])
581
self.assertFileEqual("[%s]\n"
582
"push_location = foo" % local_path,
585
# TODO RBC 20051029 test getting a push location from a branch in a
586
# recursive section - that is, it appends the branch name.
589
self.assertEqual('foo', branch.get_push_location())
589
592
class TestFormat(TestCaseWithBranch):
590
593
"""Tests for the format itself."""
595
def test_get_reference(self):
596
"""get_reference on all regular branches should return None."""
597
if not self.branch_format.is_supported():
598
# unsupported formats are not loopback testable
599
# because the default open will not open them and
600
# they may not be initializable.
602
made_branch = self.make_branch('.')
603
self.assertEqual(None,
604
made_branch._format.get_reference(made_branch.bzrdir))
606
def test_set_reference(self):
607
"""set_reference on all regular branches should be callable."""
608
if not self.branch_format.is_supported():
609
# unsupported formats are not loopback testable
610
# because the default open will not open them and
611
# they may not be initializable.
613
this_branch = self.make_branch('this')
614
other_branch = self.make_branch('other')
616
this_branch._format.set_reference(this_branch.bzrdir, other_branch)
617
except NotImplementedError:
621
ref = this_branch._format.get_reference(this_branch.bzrdir)
622
self.assertEqual(ref, other_branch.base)
592
624
def test_format_initialize_find_open(self):
593
625
# loopback test to check the current format initializes to itself.
594
626
if not self.branch_format.is_supported():
621
653
except NotImplementedError:
623
655
self.assertEqual(self.branch_format,
624
branch.BranchFormat.find_format(opened_control))
656
opened_control.find_branch_format())
659
class TestBound(TestCaseWithBranch):
661
def test_bind_unbind(self):
662
branch = self.make_branch('1')
663
branch2 = self.make_branch('2')
666
except errors.UpgradeRequired:
667
raise tests.TestNotApplicable('Format does not support binding')
668
self.assertTrue(branch.unbind())
669
self.assertFalse(branch.unbind())
670
self.assertIs(None, branch.get_bound_location())
672
def test_old_bound_location(self):
673
branch = self.make_branch('branch1')
675
self.assertIs(None, branch.get_old_bound_location())
676
except errors.UpgradeRequired:
677
raise tests.TestNotApplicable(
678
'Format does not store old bound locations')
679
branch2 = self.make_branch('branch2')
681
self.assertIs(None, branch.get_old_bound_location())
683
self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
685
def test_bind_diverged(self):
686
tree_a = self.make_branch_and_tree('tree_a')
687
tree_a.commit('rev1a')
688
tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
689
tree_a.commit('rev2a')
690
tree_b.commit('rev2b')
692
tree_b.branch.bind(tree_a.branch)
693
except errors.UpgradeRequired:
694
raise tests.TestNotApplicable('Format does not support binding')
697
class TestStrict(TestCaseWithBranch):
699
def test_strict_history(self):
700
tree1 = self.make_branch_and_tree('tree1')
702
tree1.branch.set_append_revisions_only(True)
703
except errors.UpgradeRequired:
704
raise TestSkipped('Format does not support strict history')
705
tree1.commit('empty commit')
706
tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
707
tree2.commit('empty commit 2')
708
tree1.pull(tree2.branch)
709
tree1.commit('empty commit 3')
710
tree2.commit('empty commit 4')
711
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
712
tree2.merge_from_branch(tree1.branch)
713
tree2.commit('empty commit 5')
714
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
716
tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
717
tree3.merge_from_branch(tree2.branch)
718
tree3.commit('empty commit 6')
719
tree2.pull(tree3.branch)