/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Vincent Ladeuil
  • Date: 2010-09-24 09:56:50 UTC
  • mto: This revision was merged to the branch mainline in revision 5446.
  • Revision ID: v.ladeuil+lp@free.fr-20100924095650-okd49n2o18q9zkmb
Clarify SRU bug nomination.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
122
123
        self.failUnlessExists(filename)
123
124
 
124
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
125
139
class TestTransportScenarios(tests.TestCase):
126
140
    """A group of tests that test the transport implementation adaption core.
127
141
 
208
222
    def test_scenarios(self):
209
223
        # check that constructor parameters are passed through to the adapted
210
224
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
212
226
        vfs_factory = "v"
213
227
        server1 = "a"
214
228
        server2 = "b"
312
326
        from bzrlib.tests.per_interrepository import make_scenarios
313
327
        server1 = "a"
314
328
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
        scenarios = make_scenarios(server1, server2, formats)
317
331
        self.assertEqual([
318
332
            ('C0,str,str',
319
333
             {'repository_format': 'C1',
320
334
              'repository_format_to': 'C2',
321
335
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
323
338
            ('D0,str,str',
324
339
             {'repository_format': 'D1',
325
340
              'repository_format_to': 'D2',
326
341
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
328
344
            scenarios)
329
345
 
330
346
 
609
625
                l.attempt_lock()
610
626
        test = TestDanglingLock('test_function')
611
627
        result = test.run()
 
628
        total_failures = result.errors + result.failures
612
629
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
630
            self.assertLength(1, total_failures)
614
631
        else:
615
632
            # When _lock_check_thorough is disabled, then we don't trigger a
616
633
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
634
            self.assertLength(0, total_failures)
618
635
 
619
636
 
620
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
638
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
639
 
623
640
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
641
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
642
        self.vfs_transport_factory = memory.MemoryServer
627
643
        self.transport_readonly_server = None
629
645
        # for the server
630
646
        url = self.get_readonly_url()
631
647
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
634
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
653
 
638
654
    def test_get_readonly_url_http(self):
639
655
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
656
        from bzrlib.transport.http import HttpTransportBase
642
657
        self.transport_server = test_server.LocalURLServer
643
658
        self.transport_readonly_server = HttpServer
645
660
        url = self.get_readonly_url()
646
661
        url2 = self.get_readonly_url('foo/bar')
647
662
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
650
665
        self.failUnless(isinstance(t, HttpTransportBase))
651
666
        self.failUnless(isinstance(t2, HttpTransportBase))
652
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
705
class TestChrootedTest(tests.ChrootedTestCase):
691
706
 
692
707
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
695
709
        url = t.base
696
710
        self.assertEqual(url, t.clone('..').base)
697
711
 
803
817
        self.requireFeature(test_lsprof.LSProfFeature)
804
818
        result_stream = StringIO()
805
819
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
820
            result_stream,
807
821
            descriptions=0,
808
822
            verbosity=2,
809
823
            )
839
853
        """A KnownFailure being raised should trigger several result actions."""
840
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
855
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
856
            def report_tests_starting(self): pass
844
857
            def report_known_failure(self, test, err=None, details=None):
845
858
                self._call = test, 'known failure'
846
859
        result = InstrumentedTestResult(None, None, None, None)
864
877
        # verbose test output formatting
865
878
        result_stream = StringIO()
866
879
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
880
            result_stream,
868
881
            descriptions=0,
869
882
            verbosity=2,
870
883
            )
880
893
        output = result_stream.getvalue()[prefix:]
881
894
        lines = output.splitlines()
882
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
883
899
        self.assertEqual(lines[1], '    foo')
884
900
        self.assertEqual(2, len(lines))
885
901
 
893
909
        """Test the behaviour of invoking addNotSupported."""
894
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
911
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
898
913
            def report_unsupported(self, test, feature):
899
914
                self._call = test, feature
900
915
        result = InstrumentedTestResult(None, None, None, None)
919
934
        # verbose test output formatting
920
935
        result_stream = StringIO()
921
936
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
937
            result_stream,
923
938
            descriptions=0,
924
939
            verbosity=2,
925
940
            )
939
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
956
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
944
958
            def addNotSupported(self, test, feature):
945
959
                self._call = test, feature
946
960
        result = InstrumentedTestResult(None, None, None, None)
988
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1003
            calls = 0
990
1004
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1005
        result = InstrumentedTestResult(None, None, None, None)
993
1006
        def test_function():
994
1007
            pass
996
1009
        test.run(result)
997
1010
        self.assertEquals(1, result.calls)
998
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
999
1025
 
1000
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1027
 
1022
1048
        because of our use of global state.
1023
1049
        """
1024
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1051
        try:
1027
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1053
            return testrunner.run(test)
1030
1054
        finally:
1031
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1056
 
1034
1057
    def test_known_failure_failed_run(self):
1035
1058
        # run a test that generates a known failure which should be printed in
1213
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1237
        self.assertLength(1, self._get_source_tree_calls)
1215
1238
 
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1250
 
1216
1251
    def test_startTestRun(self):
1217
1252
        """run should call result.startTestRun()"""
1218
1253
        calls = []
1421
1456
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1457
        output_stream = StringIO()
1423
1458
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1459
            output_stream,
1425
1460
            descriptions=0,
1426
1461
            verbosity=2)
1427
1462
        sample_test.run(result)
1655
1690
        self.assertEqual('original', obj.test_attr)
1656
1691
 
1657
1692
 
 
1693
class TestTestCloning(tests.TestCase):
 
1694
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1695
 
 
1696
    def test_cloned_testcase_does_not_share_details(self):
 
1697
        """A TestCase cloned with clone_test does not share mutable attributes
 
1698
        such as details or cleanups.
 
1699
        """
 
1700
        class Test(tests.TestCase):
 
1701
            def test_foo(self):
 
1702
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1703
        orig_test = Test('test_foo')
 
1704
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1705
        orig_test.run(unittest.TestResult())
 
1706
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1707
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1708
 
 
1709
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1710
        """Applying two levels of scenarios to a test preserves the attributes
 
1711
        added by both scenarios.
 
1712
        """
 
1713
        class Test(tests.TestCase):
 
1714
            def test_foo(self):
 
1715
                pass
 
1716
        test = Test('test_foo')
 
1717
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1718
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1719
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1720
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1721
        all_tests = list(tests.iter_suite_tests(suite))
 
1722
        self.assertLength(4, all_tests)
 
1723
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1724
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1725
 
 
1726
 
1658
1727
# NB: Don't delete this; it's not actually from 0.11!
1659
1728
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1729
def sample_deprecated_function():
2339
2408
            os.chdir = orig_chdir
2340
2409
        self.assertEqual(['foo', 'current'], chdirs)
2341
2410
 
 
2411
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2412
        self.get_source_path = lambda: ""
 
2413
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2414
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2415
 
2342
2416
 
2343
2417
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2418
    """Tests that really need to do things with an external bzr."""
2960
3034
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3035
 
2962
3036
 
 
3037
class TestThreadLeakDetection(tests.TestCase):
 
3038
    """Ensure when tests leak threads we detect and report it"""
 
3039
 
 
3040
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3041
        def __init__(self):
 
3042
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3043
            self.leaks = []
 
3044
        def _report_thread_leak(self, test, leaks, alive):
 
3045
            self.leaks.append((test, leaks))
 
3046
 
 
3047
    def test_testcase_without_addCleanups(self):
 
3048
        """Check old TestCase instances don't break with leak detection"""
 
3049
        class Test(unittest.TestCase):
 
3050
            def runTest(self):
 
3051
                pass
 
3052
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3053
        result = self.LeakRecordingResult()
 
3054
        test = Test()
 
3055
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3056
        result.startTestRun()
 
3057
        test.run(result)
 
3058
        result.stopTestRun()
 
3059
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3060
        self.assertEqual(result.leaks, [])
 
3061
        
 
3062
    def test_thread_leak(self):
 
3063
        """Ensure a thread that outlives the running of a test is reported
 
3064
 
 
3065
        Uses a thread that blocks on an event, and is started by the inner
 
3066
        test case. As the thread outlives the inner case's run, it should be
 
3067
        detected as a leak, but the event is then set so that the thread can
 
3068
        be safely joined in cleanup so it's not leaked for real.
 
3069
        """
 
3070
        event = threading.Event()
 
3071
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3072
        class Test(tests.TestCase):
 
3073
            def test_leak(self):
 
3074
                thread.start()
 
3075
        result = self.LeakRecordingResult()
 
3076
        test = Test("test_leak")
 
3077
        self.addCleanup(thread.join)
 
3078
        self.addCleanup(event.set)
 
3079
        result.startTestRun()
 
3080
        test.run(result)
 
3081
        result.stopTestRun()
 
3082
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3083
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3084
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3085
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3086
 
 
3087
    def test_multiple_leaks(self):
 
3088
        """Check multiple leaks are blamed on the test cases at fault
 
3089
 
 
3090
        Same concept as the previous test, but has one inner test method that
 
3091
        leaks two threads, and one that doesn't leak at all.
 
3092
        """
 
3093
        event = threading.Event()
 
3094
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3095
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3096
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3097
        class Test(tests.TestCase):
 
3098
            def test_first_leak(self):
 
3099
                thread_b.start()
 
3100
            def test_second_no_leak(self):
 
3101
                pass
 
3102
            def test_third_leak(self):
 
3103
                thread_c.start()
 
3104
                thread_a.start()
 
3105
        result = self.LeakRecordingResult()
 
3106
        first_test = Test("test_first_leak")
 
3107
        third_test = Test("test_third_leak")
 
3108
        self.addCleanup(thread_a.join)
 
3109
        self.addCleanup(thread_b.join)
 
3110
        self.addCleanup(thread_c.join)
 
3111
        self.addCleanup(event.set)
 
3112
        result.startTestRun()
 
3113
        unittest.TestSuite(
 
3114
            [first_test, Test("test_second_no_leak"), third_test]
 
3115
            ).run(result)
 
3116
        result.stopTestRun()
 
3117
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3118
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3119
        self.assertEqual(result.leaks, [
 
3120
            (first_test, set([thread_b])),
 
3121
            (third_test, set([thread_a, thread_c]))])
 
3122
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3123
 
 
3124
 
2963
3125
class TestRunSuite(tests.TestCase):
2964
3126
 
2965
3127
    def test_runner_class(self):