/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    gpg,
27
27
    urlutils,
28
28
    transactions,
 
29
    remote,
29
30
    repository,
30
31
    )
31
32
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
39
40
from bzrlib.osutils import getcwd
40
41
import bzrlib.revision
41
42
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
43
from bzrlib.tests.branch_implementations import TestCaseWithBranch
43
44
from bzrlib.tests.HttpServer import HttpServer
44
45
from bzrlib.trace import mutter
45
46
from bzrlib.transport import get_transport
48
49
from bzrlib.workingtree import WorkingTree
49
50
 
50
51
 
51
 
# TODO: Make a branch using basis branch, and check that it 
52
 
# doesn't request any files that could have been avoided, by 
53
 
# hooking into the Transport.
54
 
 
55
 
 
56
 
class TestCaseWithBranch(TestCaseWithBzrDir):
57
 
 
58
 
    def setUp(self):
59
 
        super(TestCaseWithBranch, self).setUp()
60
 
        self.branch = None
61
 
 
62
 
    def get_branch(self):
63
 
        if self.branch is None:
64
 
            self.branch = self.make_branch('')
65
 
        return self.branch
66
 
 
67
 
    def make_branch(self, relpath, format=None):
68
 
        repo = self.make_repository(relpath, format=format)
69
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
70
 
        # Skipped is the wrong exception to raise.
71
 
        try:
72
 
            return self.branch_format.initialize(repo.bzrdir)
73
 
        except errors.UninitializableFormat:
74
 
            raise TestSkipped('Uninitializable branch format')
75
 
 
76
 
    def make_repository(self, relpath, shared=False, format=None):
77
 
        made_control = self.make_bzrdir(relpath, format=format)
78
 
        return made_control.create_repository(shared=shared)
79
 
 
80
 
 
81
52
class TestBranch(TestCaseWithBranch):
82
53
 
83
54
    def test_append_revisions(self):
84
55
        """Test appending more than one revision"""
 
56
        wt = self.make_branch_and_tree('tree')
 
57
        wt.commit('f', rev_id='rev1')
 
58
        wt.commit('f', rev_id='rev2')
 
59
        wt.commit('f', rev_id='rev3')
 
60
 
85
61
        br = self.get_branch()
 
62
        br.fetch(wt.branch)
86
63
        br.append_revision("rev1")
87
64
        self.assertEquals(br.revision_history(), ["rev1",])
88
65
        br.append_revision("rev2", "rev3")
89
66
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
67
        self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
 
68
 
 
69
    def test_create_tree_with_merge(self):
 
70
        tree = self.create_tree_with_merge()
 
71
        ancestry_graph = tree.branch.repository.get_revision_graph('rev-3')
 
72
        self.assertEqual({'rev-1':[],
 
73
                          'rev-2':['rev-1'],
 
74
                          'rev-1.1.1':['rev-1'],
 
75
                          'rev-3':['rev-2', 'rev-1.1.1'],
 
76
                         }, ancestry_graph)
 
77
 
 
78
    def test_revision_ids_are_utf8(self):
 
79
        wt = self.make_branch_and_tree('tree')
 
80
        wt.commit('f', rev_id='rev1')
 
81
        wt.commit('f', rev_id='rev2')
 
82
        wt.commit('f', rev_id='rev3')
 
83
 
 
84
        br = self.get_branch()
 
85
        br.fetch(wt.branch)
 
86
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
 
87
        rh = br.revision_history()
 
88
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
 
89
        for revision_id in rh:
 
90
            self.assertIsInstance(revision_id, str)
 
91
        last = br.last_revision()
 
92
        self.assertEqual('rev3', last)
 
93
        self.assertIsInstance(last, str)
 
94
        revno, last = br.last_revision_info()
 
95
        self.assertEqual(3, revno)
 
96
        self.assertEqual('rev3', last)
 
97
        self.assertIsInstance(last, str)
90
98
 
91
99
    def test_fetch_revisions(self):
92
100
        """Test fetch-revision operation."""
93
 
        get_transport(self.get_url()).mkdir('b1')
94
 
        get_transport(self.get_url()).mkdir('b2')
95
101
        wt = self.make_branch_and_tree('b1')
96
102
        b1 = wt.branch
97
 
        b2 = self.make_branch('b2')
98
 
        file('b1/foo', 'w').write('hello')
 
103
        self.build_tree_contents([('b1/foo', 'hello')])
99
104
        wt.add(['foo'], ['foo-id'])
100
105
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
101
106
 
102
 
        mutter('start fetch')
 
107
        b2 = self.make_branch('b2')
103
108
        self.assertEqual((1, []), b2.fetch(b1))
104
109
 
105
110
        rev = b2.repository.get_revision('revision-1')
124
129
 
125
130
    def get_unbalanced_tree_pair(self):
126
131
        """Return two branches, a and b, with one file in a."""
127
 
        get_transport(self.get_url()).mkdir('a')
128
132
        tree_a = self.make_branch_and_tree('a')
129
 
        file('a/b', 'wb').write('b')
 
133
        self.build_tree_contents([('a/b', 'b')])
130
134
        tree_a.add('b')
131
135
        tree_a.commit("silly commit", rev_id='A')
132
136
 
133
 
        get_transport(self.get_url()).mkdir('b')
134
137
        tree_b = self.make_branch_and_tree('b')
135
138
        return tree_a, tree_b
136
139
 
140
143
        tree_b.branch.repository.fetch(tree_a.branch.repository)
141
144
        return tree_a, tree_b
142
145
 
143
 
    def test_clone_branch(self):
144
 
        """Copy the stores from one branch to another"""
145
 
        tree_a, tree_b = self.get_balanced_branch_pair()
146
 
        tree_b.commit("silly commit")
147
 
        os.mkdir('c')
148
 
        # this fails to test that the history from a was not used.
149
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
 
        self.assertEqual(tree_a.branch.revision_history(),
151
 
                         dir_c.open_branch().revision_history())
152
 
 
153
146
    def test_clone_partial(self):
154
147
        """Copy only part of the history of a branch."""
155
148
        # TODO: RBC 20060208 test with a revision not on revision-history.
156
149
        #       what should that behaviour be ? Emailed the list.
 
150
        # First, make a branch with two commits.
157
151
        wt_a = self.make_branch_and_tree('a')
158
152
        self.build_tree(['a/one'])
159
153
        wt_a.add(['one'])
161
155
        self.build_tree(['a/two'])
162
156
        wt_a.add(['two'])
163
157
        wt_a.commit('commit two', rev_id='2')
 
158
        # Now make a copy of the repository.
164
159
        repo_b = self.make_repository('b')
165
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
160
        wt_a.branch.repository.copy_content_into(repo_b)
 
161
        # wt_a might be a lightweight checkout, so get a hold of the actual
 
162
        # branch (because you can't do a partial clone of a lightweight
 
163
        # checkout).
 
164
        branch = wt_a.branch.bzrdir.open_branch()
 
165
        # Then make a branch where the new repository is, but specify a revision
 
166
        # ID.  The new branch's history will stop at the specified revision.
 
167
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
167
168
        self.assertEqual('1', br_b.last_revision())
168
169
 
169
170
    def test_sprout_partial(self):
177
178
        wt_a.add(['two'])
178
179
        wt_a.commit('commit two', rev_id='2')
179
180
        repo_b = self.make_repository('b')
180
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
181
        repo_a = wt_a.branch.repository
 
182
        repo_a.copy_content_into(repo_b)
 
183
        br_b = wt_a.branch.sprout(repo_b.bzrdir, revision_id='1')
182
184
        self.assertEqual('1', br_b.last_revision())
183
185
 
184
186
    def get_parented_branch(self):
230
232
        branch.set_submit_branch('sftp://example.net')
231
233
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
232
234
        
 
235
    def test_public_branch(self):
 
236
        """public location can be queried and set"""
 
237
        branch = self.make_branch('branch')
 
238
        self.assertEqual(branch.get_public_branch(), None)
 
239
        branch.set_public_branch('sftp://example.com')
 
240
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
 
241
        branch.set_public_branch('sftp://example.net')
 
242
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
 
243
        branch.set_public_branch(None)
 
244
        self.assertEqual(branch.get_public_branch(), None)
 
245
 
233
246
    def test_record_initial_ghost(self):
234
247
        """Branches should support having ghosts."""
235
248
        wt = self.make_branch_and_tree('.')
273
286
        from bzrlib.testament import Testament
274
287
        strategy = gpg.LoopbackGPGStrategy(None)
275
288
        branch.repository.sign_revision('A', strategy)
276
 
        self.assertEqual(Testament.from_revision(branch.repository, 
277
 
                         'A').as_short_text(),
 
289
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
 
290
                         Testament.from_revision(branch.repository,
 
291
                         'A').as_short_text() +
 
292
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
278
293
                         branch.repository.get_signature_text('A'))
279
294
 
280
295
    def test_store_signature(self):
286
301
                          branch.repository.has_signature_for_revision_id,
287
302
                          'A')
288
303
        wt.commit("base", allow_pointless=True, rev_id='A')
289
 
        self.assertEqual('FOO', 
 
304
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
 
305
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
290
306
                         branch.repository.get_signature_text('A'))
291
307
 
292
308
    def test_branch_keeps_signatures(self):
293
309
        wt = self.make_branch_and_tree('source')
294
310
        wt.commit('A', allow_pointless=True, rev_id='A')
295
 
        wt.branch.repository.sign_revision('A',
296
 
            gpg.LoopbackGPGStrategy(None))
 
311
        repo = wt.branch.repository
 
312
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
297
313
        #FIXME: clone should work to urls,
298
314
        # wt.clone should work to disks.
299
315
        self.build_tree(['target/'])
300
 
        d2 = wt.bzrdir.clone('target')
301
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
 
316
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
 
317
        self.assertEqual(repo.get_signature_text('A'),
302
318
                         d2.open_repository().get_signature_text('A'))
303
319
 
304
320
    def test_nicks(self):
305
 
        """Branch nicknames"""
 
321
        """Test explicit and implicit branch nicknames.
 
322
        
 
323
        Nicknames are implicitly the name of the branch's directory, unless an
 
324
        explicit nickname is set.  That is, an explicit nickname always
 
325
        overrides the implicit one.
 
326
        """
306
327
        t = get_transport(self.get_url())
307
 
        t.mkdir('bzr.dev')
308
328
        branch = self.make_branch('bzr.dev')
 
329
        # The nick will be 'bzr.dev', because there is no explicit nick set.
309
330
        self.assertEqual(branch.nick, 'bzr.dev')
 
331
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
 
332
        # will report its nick as 'bzr.ab'.
310
333
        t.move('bzr.dev', 'bzr.ab')
311
334
        branch = Branch.open(self.get_url('bzr.ab'))
312
335
        self.assertEqual(branch.nick, 'bzr.ab')
313
 
        branch.nick = "Aaron's branch"
314
 
        branch.nick = "Aaron's branch"
315
 
        self.failUnless(
316
 
            t.has(
317
 
                t.relpath(
318
 
                    branch.control_files.controlfilename("branch.conf")
319
 
                    )
320
 
                )
321
 
            )
 
336
        # Set the branch nick explicitly.  This will ensure there's a branch
 
337
        # config file in the branch.
 
338
        branch.nick = "Aaron's branch"
 
339
        branch.nick = "Aaron's branch"
 
340
        if not isinstance(branch, remote.RemoteBranch):
 
341
            controlfilename = branch.control_files.controlfilename
 
342
            self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
 
343
        # Because the nick has been set explicitly, the nick is now always
 
344
        # "Aaron's branch", regardless of directory name.
322
345
        self.assertEqual(branch.nick, "Aaron's branch")
323
346
        t.move('bzr.ab', 'integration')
324
347
        branch = Branch.open(self.get_url('integration'))
328
351
 
329
352
    def test_commit_nicks(self):
330
353
        """Nicknames are committed to the revision"""
331
 
        get_transport(self.get_url()).mkdir('bzr.dev')
332
354
        wt = self.make_branch_and_tree('bzr.dev')
333
355
        branch = wt.branch
334
356
        branch.nick = "My happy branch"
335
357
        wt.commit('My commit respect da nick.')
336
358
        committed = branch.repository.get_revision(branch.last_revision())
337
 
        self.assertEqual(committed.properties["branch-nick"], 
 
359
        self.assertEqual(committed.properties["branch-nick"],
338
360
                         "My happy branch")
339
361
 
340
362
    def test_create_open_branch_uses_repository(self):
342
364
            repo = self.make_repository('.', shared=True)
343
365
        except errors.IncompatibleFormat:
344
366
            return
345
 
        repo.bzrdir.root_transport.mkdir('child')
346
 
        child_dir = self.bzrdir_format.initialize('child')
 
367
        child_transport = repo.bzrdir.root_transport.clone('child')
 
368
        child_transport.mkdir('.')
 
369
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
347
370
        try:
348
371
            child_branch = self.branch_format.initialize(child_dir)
349
372
        except errors.UninitializableFormat:
413
436
        """A lightweight checkout from a readonly branch should succeed."""
414
437
        tree_a = self.make_branch_and_tree('a')
415
438
        rev_id = tree_a.commit('put some content in the branch')
416
 
        source_branch = bzrlib.branch.Branch.open(
417
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
 
439
        # open the branch via a readonly transport
 
440
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
418
441
        # sanity check that the test will be valid
419
442
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
420
443
            source_branch.lock_write)
425
448
        """A regular checkout from a readonly branch should succeed."""
426
449
        tree_a = self.make_branch_and_tree('a')
427
450
        rev_id = tree_a.commit('put some content in the branch')
428
 
        source_branch = bzrlib.branch.Branch.open(
429
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
 
451
        # open the branch via a readonly transport
 
452
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
430
453
        # sanity check that the test will be valid
431
454
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
455
            source_branch.lock_write)
433
456
        checkout = source_branch.create_checkout('c')
434
457
        self.assertEqual(rev_id, checkout.last_revision())
435
458
 
 
459
    def test_set_revision_history(self):
 
460
        tree = self.make_branch_and_tree('a')
 
461
        tree.commit('a commit', rev_id='rev1')
 
462
        br = tree.branch
 
463
        br.set_revision_history(["rev1"])
 
464
        self.assertEquals(br.revision_history(), ["rev1"])
 
465
        br.set_revision_history([])
 
466
        self.assertEquals(br.revision_history(), [])
 
467
 
436
468
 
437
469
class ChrootedTests(TestCaseWithBranch):
438
470
    """A support class that provides readonly urls outside the local namespace.
444
476
 
445
477
    def setUp(self):
446
478
        super(ChrootedTests, self).setUp()
447
 
        if not self.transport_server == MemoryServer:
 
479
        if not self.vfs_transport_factory == MemoryServer:
448
480
            self.transport_readonly_server = HttpServer
449
481
 
450
482
    def test_open_containing(self):
522
554
        self.assertEqual(['lw', 'ul'], branch._calls)
523
555
 
524
556
 
525
 
class TestBranchTransaction(TestCaseWithBranch):
526
 
 
527
 
    def setUp(self):
528
 
        super(TestBranchTransaction, self).setUp()
529
 
        self.branch = None
530
 
        
531
 
    def test_default_get_transaction(self):
532
 
        """branch.get_transaction on a new branch should give a PassThrough."""
533
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
534
 
                                   transactions.PassThroughTransaction))
535
 
 
536
 
    def test__set_new_transaction(self):
537
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
538
 
 
539
 
    def test__set_over_existing_transaction_raises(self):
540
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
541
 
        self.assertRaises(errors.LockError,
542
 
                          self.get_branch()._set_transaction,
543
 
                          transactions.ReadOnlyTransaction())
544
 
 
545
 
    def test_finish_no_transaction_raises(self):
546
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
547
 
 
548
 
    def test_finish_readonly_transaction_works(self):
549
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
550
 
        self.get_branch()._finish_transaction()
551
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
552
 
 
553
 
    def test_unlock_calls_finish(self):
554
 
        self.get_branch().lock_read()
555
 
        transaction = InstrumentedTransaction()
556
 
        self.get_branch().control_files._transaction = transaction
557
 
        self.get_branch().unlock()
558
 
        self.assertEqual(['finish'], transaction.calls)
559
 
 
560
 
    def test_lock_read_acquires_ro_transaction(self):
561
 
        self.get_branch().lock_read()
562
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
563
 
                                   transactions.ReadOnlyTransaction))
564
 
        self.get_branch().unlock()
565
 
        
566
 
    def test_lock_write_acquires_write_transaction(self):
567
 
        self.get_branch().lock_write()
568
 
        # cannot use get_transaction as its magic
569
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
570
 
                                   transactions.WriteTransaction))
571
 
        self.get_branch().unlock()
572
 
 
573
 
 
574
557
class TestBranchPushLocations(TestCaseWithBranch):
575
558
 
576
559
    def test_get_push_location_unset(self):
587
570
        self.assertEqual("foo", self.get_branch().get_push_location())
588
571
 
589
572
    def test_set_push_location(self):
590
 
        from bzrlib.config import (locations_config_filename,
591
 
                                   ensure_config_dir_exists)
592
 
        ensure_config_dir_exists()
593
 
        fn = locations_config_filename()
594
573
        branch = self.get_branch()
595
574
        branch.set_push_location('foo')
596
 
        local_path = urlutils.local_path_from_url(branch.base[:-1])
597
 
        self.assertFileEqual("[%s]\n"
598
 
                             "push_location = foo\n"
599
 
                             "push_location:policy = norecurse" % local_path,
600
 
                             fn)
601
 
 
602
 
    # TODO RBC 20051029 test getting a push location from a branch in a 
603
 
    # recursive section - that is, it appends the branch name.
 
575
        self.assertEqual('foo', branch.get_push_location())
604
576
 
605
577
 
606
578
class TestFormat(TestCaseWithBranch):
607
579
    """Tests for the format itself."""
608
580
 
 
581
    def test_get_reference(self):
 
582
        """get_reference on all regular branches should return None."""
 
583
        if not self.branch_format.is_supported():
 
584
            # unsupported formats are not loopback testable
 
585
            # because the default open will not open them and
 
586
            # they may not be initializable.
 
587
            return
 
588
        made_branch = self.make_branch('.')
 
589
        self.assertEqual(None,
 
590
            made_branch._format.get_reference(made_branch.bzrdir))
 
591
 
609
592
    def test_format_initialize_find_open(self):
610
593
        # loopback test to check the current format initializes to itself.
611
594
        if not self.branch_format.is_supported():
638
621
        except NotImplementedError:
639
622
            return
640
623
        self.assertEqual(self.branch_format,
641
 
                         branch.BranchFormat.find_format(opened_control))
642
 
 
643
 
 
 
624
                         opened_control.find_branch_format())
 
625
 
 
626
 
 
627
class TestBound(TestCaseWithBranch):
 
628
 
 
629
    def test_bind_unbind(self):
 
630
        branch = self.make_branch('1')
 
631
        branch2 = self.make_branch('2')
 
632
        try:
 
633
            branch.bind(branch2)
 
634
        except errors.UpgradeRequired:
 
635
            raise TestSkipped('Format does not support binding')
 
636
        self.assertTrue(branch.unbind())
 
637
        self.assertFalse(branch.unbind())
 
638
        self.assertIs(None, branch.get_bound_location())
 
639
 
 
640
    def test_old_bound_location(self):
 
641
        branch = self.make_branch('branch1')
 
642
        try:
 
643
            self.assertIs(None, branch.get_old_bound_location())
 
644
        except errors.UpgradeRequired:
 
645
            raise TestSkipped('Format does not store old bound locations')
 
646
        branch2 = self.make_branch('branch2')
 
647
        branch.bind(branch2)
 
648
        self.assertIs(None, branch.get_old_bound_location())
 
649
        branch.unbind()
 
650
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
 
651
 
 
652
 
 
653
class TestStrict(TestCaseWithBranch):
 
654
 
 
655
    def test_strict_history(self):
 
656
        tree1 = self.make_branch_and_tree('tree1')
 
657
        try:
 
658
            tree1.branch.set_append_revisions_only(True)
 
659
        except errors.UpgradeRequired:
 
660
            raise TestSkipped('Format does not support strict history')
 
661
        tree1.commit('empty commit')
 
662
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
 
663
        tree2.commit('empty commit 2')
 
664
        tree1.pull(tree2.branch)
 
665
        tree1.commit('empty commit 3')
 
666
        tree2.commit('empty commit 4')
 
667
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
 
668
        tree2.merge_from_branch(tree1.branch)
 
669
        tree2.commit('empty commit 5')
 
670
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
 
671
                          tree2.branch)
 
672
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
 
673
        tree3.merge_from_branch(tree2.branch)
 
674
        tree3.commit('empty commit 6')
 
675
        tree2.pull(tree3.branch)