/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

Merge require_testtools_0.9.5_for_selftest to reflect test requirment and resolve news conflict

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
42
47
    lockdir,
43
48
    memorytree,
44
49
    osutils,
45
 
    progress,
46
50
    remote,
47
51
    repository,
48
52
    symbol_versioning,
67
71
    test_sftp_transport,
68
72
    TestUtil,
69
73
    )
70
 
from bzrlib.trace import note
 
74
from bzrlib.trace import note, mutter
71
75
from bzrlib.transport import memory
72
76
from bzrlib.version import _get_bzr_source_tree
73
77
 
77
81
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
82
 
79
83
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
84
class MetaTestLog(tests.TestCase):
93
85
 
94
86
    def test_logging(self):
325
317
        from bzrlib.tests.per_interrepository import make_scenarios
326
318
        server1 = "a"
327
319
        server2 = "b"
328
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
320
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
329
321
        scenarios = make_scenarios(server1, server2, formats)
330
322
        self.assertEqual([
331
323
            ('C0,str,str',
332
324
             {'repository_format': 'C1',
333
325
              'repository_format_to': 'C2',
334
326
              'transport_readonly_server': 'b',
335
 
              'transport_server': 'a'}),
 
327
              'transport_server': 'a',
 
328
              'extra_setup': 'C3'}),
336
329
            ('D0,str,str',
337
330
             {'repository_format': 'D1',
338
331
              'repository_format_to': 'D2',
339
332
              'transport_readonly_server': 'b',
340
 
              'transport_server': 'a'})],
 
333
              'transport_server': 'a',
 
334
              'extra_setup': 'D3'})],
341
335
            scenarios)
342
336
 
343
337
 
624
618
        result = test.run()
625
619
        total_failures = result.errors + result.failures
626
620
        if self._lock_check_thorough:
627
 
            self.assertLength(1, total_failures)
 
621
            self.assertEqual(1, len(total_failures))
628
622
        else:
629
623
            # When _lock_check_thorough is disabled, then we don't trigger a
630
624
            # failure
631
 
            self.assertLength(0, total_failures)
 
625
            self.assertEqual(0, len(total_failures))
632
626
 
633
627
 
634
628
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
846
840
        self.assertContainsRe(output,
847
841
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
848
842
 
 
843
    def test_uses_time_from_testtools(self):
 
844
        """Test case timings in verbose results should use testtools times"""
 
845
        import datetime
 
846
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
847
            def startTest(self, test):
 
848
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
849
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
850
            def addSuccess(self, test):
 
851
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
852
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
853
            def report_tests_starting(self): pass
 
854
        sio = StringIO()
 
855
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
856
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
857
 
849
858
    def test_known_failure(self):
850
859
        """A KnownFailure being raised should trigger several result actions."""
851
860
        class InstrumentedTestResult(tests.ExtendedTestResult):
852
861
            def stopTestRun(self): pass
853
 
            def startTests(self): pass
854
 
            def report_test_start(self, test): pass
 
862
            def report_tests_starting(self): pass
855
863
            def report_known_failure(self, test, err=None, details=None):
856
864
                self._call = test, 'known failure'
857
865
        result = InstrumentedTestResult(None, None, None, None)
907
915
        """Test the behaviour of invoking addNotSupported."""
908
916
        class InstrumentedTestResult(tests.ExtendedTestResult):
909
917
            def stopTestRun(self): pass
910
 
            def startTests(self): pass
911
 
            def report_test_start(self, test): pass
 
918
            def report_tests_starting(self): pass
912
919
            def report_unsupported(self, test, feature):
913
920
                self._call = test, feature
914
921
        result = InstrumentedTestResult(None, None, None, None)
953
960
        """An UnavailableFeature being raised should invoke addNotSupported."""
954
961
        class InstrumentedTestResult(tests.ExtendedTestResult):
955
962
            def stopTestRun(self): pass
956
 
            def startTests(self): pass
957
 
            def report_test_start(self, test): pass
 
963
            def report_tests_starting(self): pass
958
964
            def addNotSupported(self, test, feature):
959
965
                self._call = test, feature
960
966
        result = InstrumentedTestResult(None, None, None, None)
1002
1008
        class InstrumentedTestResult(tests.ExtendedTestResult):
1003
1009
            calls = 0
1004
1010
            def startTests(self): self.calls += 1
1005
 
            def report_test_start(self, test): pass
1006
1011
        result = InstrumentedTestResult(None, None, None, None)
1007
1012
        def test_function():
1008
1013
            pass
1010
1015
        test.run(result)
1011
1016
        self.assertEquals(1, result.calls)
1012
1017
 
 
1018
    def test_startTests_only_once(self):
 
1019
        """With multiple tests startTests should still only be called once"""
 
1020
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1021
            calls = 0
 
1022
            def startTests(self): self.calls += 1
 
1023
        result = InstrumentedTestResult(None, None, None, None)
 
1024
        suite = unittest.TestSuite([
 
1025
            unittest.FunctionTestCase(lambda: None),
 
1026
            unittest.FunctionTestCase(lambda: None)])
 
1027
        suite.run(result)
 
1028
        self.assertEquals(1, result.calls)
 
1029
        self.assertEquals(2, result.count)
 
1030
 
1013
1031
 
1014
1032
class TestUnicodeFilenameFeature(tests.TestCase):
1015
1033
 
1036
1054
        because of our use of global state.
1037
1055
        """
1038
1056
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1039
 
        old_leak = tests.TestCase._first_thread_leaker_id
1040
1057
        try:
1041
1058
            tests.TestCaseInTempDir.TEST_ROOT = None
1042
 
            tests.TestCase._first_thread_leaker_id = None
1043
1059
            return testrunner.run(test)
1044
1060
        finally:
1045
1061
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1046
 
            tests.TestCase._first_thread_leaker_id = old_leak
1047
1062
 
1048
1063
    def test_known_failure_failed_run(self):
1049
1064
        # run a test that generates a known failure which should be printed in
1095
1110
    def test_result_decorator(self):
1096
1111
        # decorate results
1097
1112
        calls = []
1098
 
        class LoggingDecorator(tests.ForwardingResult):
 
1113
        class LoggingDecorator(ExtendedToOriginalDecorator):
1099
1114
            def startTest(self, test):
1100
 
                tests.ForwardingResult.startTest(self, test)
 
1115
                ExtendedToOriginalDecorator.startTest(self, test)
1101
1116
                calls.append('start')
1102
1117
        test = unittest.FunctionTestCase(lambda:None)
1103
1118
        stream = StringIO()
1227
1242
        self.assertContainsRe(output_string, "--date [0-9.]+")
1228
1243
        self.assertLength(1, self._get_source_tree_calls)
1229
1244
 
 
1245
    def test_verbose_test_count(self):
 
1246
        """A verbose test run reports the right test count at the start"""
 
1247
        suite = TestUtil.TestSuite([
 
1248
            unittest.FunctionTestCase(lambda:None),
 
1249
            unittest.FunctionTestCase(lambda:None)])
 
1250
        self.assertEqual(suite.countTestCases(), 2)
 
1251
        stream = StringIO()
 
1252
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1253
        # Need to use the CountingDecorator as that's what sets num_tests
 
1254
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1255
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1256
 
1230
1257
    def test_startTestRun(self):
1231
1258
        """run should call result.startTestRun()"""
1232
1259
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1260
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1261
            def startTestRun(self):
1235
 
                tests.ForwardingResult.startTestRun(self)
 
1262
                ExtendedToOriginalDecorator.startTestRun(self)
1236
1263
                calls.append('startTestRun')
1237
1264
        test = unittest.FunctionTestCase(lambda:None)
1238
1265
        stream = StringIO()
1244
1271
    def test_stopTestRun(self):
1245
1272
        """run should call result.stopTestRun()"""
1246
1273
        calls = []
1247
 
        class LoggingDecorator(tests.ForwardingResult):
 
1274
        class LoggingDecorator(ExtendedToOriginalDecorator):
1248
1275
            def stopTestRun(self):
1249
 
                tests.ForwardingResult.stopTestRun(self)
 
1276
                ExtendedToOriginalDecorator.stopTestRun(self)
1250
1277
                calls.append('stopTestRun')
1251
1278
        test = unittest.FunctionTestCase(lambda:None)
1252
1279
        stream = StringIO()
1686
1713
        self.assertEqual('original', obj.test_attr)
1687
1714
 
1688
1715
 
 
1716
class _MissingFeature(tests.Feature):
 
1717
    def _probe(self):
 
1718
        return False
 
1719
missing_feature = _MissingFeature()
 
1720
 
 
1721
 
 
1722
def _get_test(name):
 
1723
    """Get an instance of a specific example test.
 
1724
 
 
1725
    We protect this in a function so that they don't auto-run in the test
 
1726
    suite.
 
1727
    """
 
1728
 
 
1729
    class ExampleTests(tests.TestCase):
 
1730
 
 
1731
        def test_fail(self):
 
1732
            mutter('this was a failing test')
 
1733
            self.fail('this test will fail')
 
1734
 
 
1735
        def test_error(self):
 
1736
            mutter('this test errored')
 
1737
            raise RuntimeError('gotcha')
 
1738
 
 
1739
        def test_missing_feature(self):
 
1740
            mutter('missing the feature')
 
1741
            self.requireFeature(missing_feature)
 
1742
 
 
1743
        def test_skip(self):
 
1744
            mutter('this test will be skipped')
 
1745
            raise tests.TestSkipped('reason')
 
1746
 
 
1747
        def test_success(self):
 
1748
            mutter('this test succeeds')
 
1749
 
 
1750
        def test_xfail(self):
 
1751
            mutter('test with expected failure')
 
1752
            self.knownFailure('this_fails')
 
1753
 
 
1754
        def test_unexpected_success(self):
 
1755
            mutter('test with unexpected success')
 
1756
            self.expectFailure('should_fail', lambda: None)
 
1757
 
 
1758
    return ExampleTests(name)
 
1759
 
 
1760
 
 
1761
class TestTestCaseLogDetails(tests.TestCase):
 
1762
 
 
1763
    def _run_test(self, test_name):
 
1764
        test = _get_test(test_name)
 
1765
        result = testtools.TestResult()
 
1766
        test.run(result)
 
1767
        return result
 
1768
 
 
1769
    def test_fail_has_log(self):
 
1770
        result = self._run_test('test_fail')
 
1771
        self.assertEqual(1, len(result.failures))
 
1772
        result_content = result.failures[0][1]
 
1773
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1774
        self.assertContainsRe(result_content, 'this was a failing test')
 
1775
 
 
1776
    def test_error_has_log(self):
 
1777
        result = self._run_test('test_error')
 
1778
        self.assertEqual(1, len(result.errors))
 
1779
        result_content = result.errors[0][1]
 
1780
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1781
        self.assertContainsRe(result_content, 'this test errored')
 
1782
 
 
1783
    def test_skip_has_no_log(self):
 
1784
        result = self._run_test('test_skip')
 
1785
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1786
        skips = result.skip_reasons['reason']
 
1787
        self.assertEqual(1, len(skips))
 
1788
        test = skips[0]
 
1789
        self.assertFalse('log' in test.getDetails())
 
1790
 
 
1791
    def test_missing_feature_has_no_log(self):
 
1792
        # testtools doesn't know about addNotSupported, so it just gets
 
1793
        # considered as a skip
 
1794
        result = self._run_test('test_missing_feature')
 
1795
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1796
        skips = result.skip_reasons[missing_feature]
 
1797
        self.assertEqual(1, len(skips))
 
1798
        test = skips[0]
 
1799
        self.assertFalse('log' in test.getDetails())
 
1800
 
 
1801
    def test_xfail_has_no_log(self):
 
1802
        result = self._run_test('test_xfail')
 
1803
        self.assertEqual(1, len(result.expectedFailures))
 
1804
        result_content = result.expectedFailures[0][1]
 
1805
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1806
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1807
 
 
1808
    def test_unexpected_success_has_log(self):
 
1809
        result = self._run_test('test_unexpected_success')
 
1810
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1811
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1812
        # expectedFailures is a list of reasons?
 
1813
        test = result.unexpectedSuccesses[0]
 
1814
        details = test.getDetails()
 
1815
        self.assertTrue('log' in details)
 
1816
 
 
1817
 
 
1818
class TestTestCloning(tests.TestCase):
 
1819
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1820
 
 
1821
    def test_cloned_testcase_does_not_share_details(self):
 
1822
        """A TestCase cloned with clone_test does not share mutable attributes
 
1823
        such as details or cleanups.
 
1824
        """
 
1825
        class Test(tests.TestCase):
 
1826
            def test_foo(self):
 
1827
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1828
        orig_test = Test('test_foo')
 
1829
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1830
        orig_test.run(unittest.TestResult())
 
1831
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1832
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1833
 
 
1834
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1835
        """Applying two levels of scenarios to a test preserves the attributes
 
1836
        added by both scenarios.
 
1837
        """
 
1838
        class Test(tests.TestCase):
 
1839
            def test_foo(self):
 
1840
                pass
 
1841
        test = Test('test_foo')
 
1842
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1843
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1844
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1845
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1846
        all_tests = list(tests.iter_suite_tests(suite))
 
1847
        self.assertLength(4, all_tests)
 
1848
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1849
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1850
 
 
1851
 
1689
1852
# NB: Don't delete this; it's not actually from 0.11!
1690
1853
@deprecated_function(deprecated_in((0, 11, 0)))
1691
1854
def sample_deprecated_function():
1845
2008
                tree.branch.repository.bzrdir.root_transport)
1846
2009
 
1847
2010
 
1848
 
class SelfTestHelper:
 
2011
class SelfTestHelper(object):
1849
2012
 
1850
2013
    def run_selftest(self, **kwargs):
1851
2014
        """Run selftest returning its output."""
1911
2074
            def __call__(test, result):
1912
2075
                test.run(result)
1913
2076
            def run(test, result):
1914
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2077
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1915
2078
                calls.append("called")
1916
2079
            def countTestCases(self):
1917
2080
                return 1
2002
2165
            load_list='missing file name', list_only=True)
2003
2166
 
2004
2167
 
 
2168
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2169
 
 
2170
    _test_needs_features = [features.subunit]
 
2171
 
 
2172
    def run_subunit_stream(self, test_name):
 
2173
        from subunit import ProtocolTestCase
 
2174
        def factory():
 
2175
            return TestUtil.TestSuite([_get_test(test_name)])
 
2176
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2177
            test_suite_factory=factory)
 
2178
        test = ProtocolTestCase(stream)
 
2179
        result = testtools.TestResult()
 
2180
        test.run(result)
 
2181
        content = stream.getvalue()
 
2182
        return content, result
 
2183
 
 
2184
    def test_fail_has_log(self):
 
2185
        content, result = self.run_subunit_stream('test_fail')
 
2186
        self.assertEqual(1, len(result.failures))
 
2187
        self.assertContainsRe(content, '(?m)^log$')
 
2188
        self.assertContainsRe(content, 'this test will fail')
 
2189
 
 
2190
    def test_error_has_log(self):
 
2191
        content, result = self.run_subunit_stream('test_error')
 
2192
        self.assertContainsRe(content, '(?m)^log$')
 
2193
        self.assertContainsRe(content, 'this test errored')
 
2194
 
 
2195
    def test_skip_has_no_log(self):
 
2196
        content, result = self.run_subunit_stream('test_skip')
 
2197
        self.assertNotContainsRe(content, '(?m)^log$')
 
2198
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2199
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2200
        skips = result.skip_reasons['reason']
 
2201
        self.assertEqual(1, len(skips))
 
2202
        test = skips[0]
 
2203
        # RemotedTestCase doesn't preserve the "details"
 
2204
        ## self.assertFalse('log' in test.getDetails())
 
2205
 
 
2206
    def test_missing_feature_has_no_log(self):
 
2207
        content, result = self.run_subunit_stream('test_missing_feature')
 
2208
        self.assertNotContainsRe(content, '(?m)^log$')
 
2209
        self.assertNotContainsRe(content, 'missing the feature')
 
2210
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2211
        skips = result.skip_reasons['_MissingFeature\n']
 
2212
        self.assertEqual(1, len(skips))
 
2213
        test = skips[0]
 
2214
        # RemotedTestCase doesn't preserve the "details"
 
2215
        ## self.assertFalse('log' in test.getDetails())
 
2216
 
 
2217
    def test_xfail_has_no_log(self):
 
2218
        content, result = self.run_subunit_stream('test_xfail')
 
2219
        self.assertNotContainsRe(content, '(?m)^log$')
 
2220
        self.assertNotContainsRe(content, 'test with expected failure')
 
2221
        self.assertEqual(1, len(result.expectedFailures))
 
2222
        result_content = result.expectedFailures[0][1]
 
2223
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2224
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2225
 
 
2226
    def test_unexpected_success_has_log(self):
 
2227
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2228
        self.assertContainsRe(content, '(?m)^log$')
 
2229
        self.assertContainsRe(content, 'test with unexpected success')
 
2230
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2231
                           ' as a plain success',
 
2232
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2233
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2234
        test = result.unexpectedSuccesses[0]
 
2235
        # RemotedTestCase doesn't preserve the "details"
 
2236
        ## self.assertTrue('log' in test.getDetails())
 
2237
 
 
2238
    def test_success_has_no_log(self):
 
2239
        content, result = self.run_subunit_stream('test_success')
 
2240
        self.assertEqual(1, result.testsRun)
 
2241
        self.assertNotContainsRe(content, '(?m)^log$')
 
2242
        self.assertNotContainsRe(content, 'this test succeeds')
 
2243
 
 
2244
 
2005
2245
class TestRunBzr(tests.TestCase):
2006
2246
 
2007
2247
    out = ''
2996
3236
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2997
3237
 
2998
3238
 
 
3239
class TestThreadLeakDetection(tests.TestCase):
 
3240
    """Ensure when tests leak threads we detect and report it"""
 
3241
 
 
3242
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3243
        def __init__(self):
 
3244
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3245
            self.leaks = []
 
3246
        def _report_thread_leak(self, test, leaks, alive):
 
3247
            self.leaks.append((test, leaks))
 
3248
 
 
3249
    def test_testcase_without_addCleanups(self):
 
3250
        """Check old TestCase instances don't break with leak detection"""
 
3251
        class Test(unittest.TestCase):
 
3252
            def runTest(self):
 
3253
                pass
 
3254
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3255
        result = self.LeakRecordingResult()
 
3256
        test = Test()
 
3257
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3262
        self.assertEqual(result.leaks, [])
 
3263
        
 
3264
    def test_thread_leak(self):
 
3265
        """Ensure a thread that outlives the running of a test is reported
 
3266
 
 
3267
        Uses a thread that blocks on an event, and is started by the inner
 
3268
        test case. As the thread outlives the inner case's run, it should be
 
3269
        detected as a leak, but the event is then set so that the thread can
 
3270
        be safely joined in cleanup so it's not leaked for real.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3274
        class Test(tests.TestCase):
 
3275
            def test_leak(self):
 
3276
                thread.start()
 
3277
        result = self.LeakRecordingResult()
 
3278
        test = Test("test_leak")
 
3279
        self.addCleanup(thread.join)
 
3280
        self.addCleanup(event.set)
 
3281
        result.startTestRun()
 
3282
        test.run(result)
 
3283
        result.stopTestRun()
 
3284
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3285
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3286
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3287
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3288
 
 
3289
    def test_multiple_leaks(self):
 
3290
        """Check multiple leaks are blamed on the test cases at fault
 
3291
 
 
3292
        Same concept as the previous test, but has one inner test method that
 
3293
        leaks two threads, and one that doesn't leak at all.
 
3294
        """
 
3295
        event = threading.Event()
 
3296
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3297
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3298
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3299
        class Test(tests.TestCase):
 
3300
            def test_first_leak(self):
 
3301
                thread_b.start()
 
3302
            def test_second_no_leak(self):
 
3303
                pass
 
3304
            def test_third_leak(self):
 
3305
                thread_c.start()
 
3306
                thread_a.start()
 
3307
        result = self.LeakRecordingResult()
 
3308
        first_test = Test("test_first_leak")
 
3309
        third_test = Test("test_third_leak")
 
3310
        self.addCleanup(thread_a.join)
 
3311
        self.addCleanup(thread_b.join)
 
3312
        self.addCleanup(thread_c.join)
 
3313
        self.addCleanup(event.set)
 
3314
        result.startTestRun()
 
3315
        unittest.TestSuite(
 
3316
            [first_test, Test("test_second_no_leak"), third_test]
 
3317
            ).run(result)
 
3318
        result.stopTestRun()
 
3319
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3320
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3321
        self.assertEqual(result.leaks, [
 
3322
            (first_test, set([thread_b])),
 
3323
            (third_test, set([thread_a, thread_c]))])
 
3324
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3325
 
 
3326
 
 
3327
class TestPostMortemDebugging(tests.TestCase):
 
3328
    """Check post mortem debugging works when tests fail or error"""
 
3329
 
 
3330
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3331
        def __init__(self):
 
3332
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3333
            self.postcode = None
 
3334
        def _post_mortem(self, tb=None):
 
3335
            """Record the code object at the end of the current traceback"""
 
3336
            tb = tb or sys.exc_info()[2]
 
3337
            if tb is not None:
 
3338
                next = tb.tb_next
 
3339
                while next is not None:
 
3340
                    tb = next
 
3341
                    next = next.tb_next
 
3342
                self.postcode = tb.tb_frame.f_code
 
3343
        def report_error(self, test, err):
 
3344
            pass
 
3345
        def report_failure(self, test, err):
 
3346
            pass
 
3347
 
 
3348
    def test_location_unittest_error(self):
 
3349
        """Needs right post mortem traceback with erroring unittest case"""
 
3350
        class Test(unittest.TestCase):
 
3351
            def runTest(self):
 
3352
                raise RuntimeError
 
3353
        result = self.TracebackRecordingResult()
 
3354
        Test().run(result)
 
3355
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3356
 
 
3357
    def test_location_unittest_failure(self):
 
3358
        """Needs right post mortem traceback with failing unittest case"""
 
3359
        class Test(unittest.TestCase):
 
3360
            def runTest(self):
 
3361
                raise self.failureException
 
3362
        result = self.TracebackRecordingResult()
 
3363
        Test().run(result)
 
3364
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3365
 
 
3366
    def test_location_bt_error(self):
 
3367
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3368
        class Test(tests.TestCase):
 
3369
            def test_error(self):
 
3370
                raise RuntimeError
 
3371
        result = self.TracebackRecordingResult()
 
3372
        Test("test_error").run(result)
 
3373
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3374
 
 
3375
    def test_location_bt_failure(self):
 
3376
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3377
        class Test(tests.TestCase):
 
3378
            def test_failure(self):
 
3379
                raise self.failureException
 
3380
        result = self.TracebackRecordingResult()
 
3381
        Test("test_failure").run(result)
 
3382
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3383
 
 
3384
    def test_env_var_triggers_post_mortem(self):
 
3385
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3386
        import pdb
 
3387
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3388
        post_mortem_calls = []
 
3389
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3390
        self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
 
3391
            osutils.set_or_unset_env("BZR_TEST_PDB", None))
 
3392
        result._post_mortem(1)
 
3393
        os.environ["BZR_TEST_PDB"] = "on"
 
3394
        result._post_mortem(2)
 
3395
        self.assertEqual([2], post_mortem_calls)
 
3396
 
 
3397
 
2999
3398
class TestRunSuite(tests.TestCase):
3000
3399
 
3001
3400
    def test_runner_class(self):