/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Vincent Ladeuil
  • Date: 2010-10-07 14:34:39 UTC
  • mto: This revision was merged to the branch mainline in revision 5469.
  • Revision ID: v.ladeuil+lp@free.fr-20101007143439-1mncq23hk2h9xxl3
Bump API version to 2.3.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
67
68
    test_sftp_transport,
68
69
    TestUtil,
69
70
    )
70
 
from bzrlib.trace import note
 
71
from bzrlib.trace import note, mutter
71
72
from bzrlib.transport import memory
72
73
from bzrlib.version import _get_bzr_source_tree
73
74
 
122
123
        self.failUnlessExists(filename)
123
124
 
124
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
125
139
class TestTransportScenarios(tests.TestCase):
126
140
    """A group of tests that test the transport implementation adaption core.
127
141
 
208
222
    def test_scenarios(self):
209
223
        # check that constructor parameters are passed through to the adapted
210
224
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
212
226
        vfs_factory = "v"
213
227
        server1 = "a"
214
228
        server2 = "b"
312
326
        from bzrlib.tests.per_interrepository import make_scenarios
313
327
        server1 = "a"
314
328
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
        scenarios = make_scenarios(server1, server2, formats)
317
331
        self.assertEqual([
318
332
            ('C0,str,str',
319
333
             {'repository_format': 'C1',
320
334
              'repository_format_to': 'C2',
321
335
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
323
338
            ('D0,str,str',
324
339
             {'repository_format': 'D1',
325
340
              'repository_format_to': 'D2',
326
341
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
328
344
            scenarios)
329
345
 
330
346
 
609
625
                l.attempt_lock()
610
626
        test = TestDanglingLock('test_function')
611
627
        result = test.run()
 
628
        total_failures = result.errors + result.failures
612
629
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
630
            self.assertEqual(1, len(total_failures))
614
631
        else:
615
632
            # When _lock_check_thorough is disabled, then we don't trigger a
616
633
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
634
            self.assertEqual(0, len(total_failures))
618
635
 
619
636
 
620
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
638
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
639
 
623
640
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
641
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
642
        self.vfs_transport_factory = memory.MemoryServer
627
643
        self.transport_readonly_server = None
629
645
        # for the server
630
646
        url = self.get_readonly_url()
631
647
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
634
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
653
 
638
654
    def test_get_readonly_url_http(self):
639
655
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
656
        from bzrlib.transport.http import HttpTransportBase
642
657
        self.transport_server = test_server.LocalURLServer
643
658
        self.transport_readonly_server = HttpServer
645
660
        url = self.get_readonly_url()
646
661
        url2 = self.get_readonly_url('foo/bar')
647
662
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
650
665
        self.failUnless(isinstance(t, HttpTransportBase))
651
666
        self.failUnless(isinstance(t2, HttpTransportBase))
652
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
705
class TestChrootedTest(tests.ChrootedTestCase):
691
706
 
692
707
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
695
709
        url = t.base
696
710
        self.assertEqual(url, t.clone('..').base)
697
711
 
803
817
        self.requireFeature(test_lsprof.LSProfFeature)
804
818
        result_stream = StringIO()
805
819
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
820
            result_stream,
807
821
            descriptions=0,
808
822
            verbosity=2,
809
823
            )
835
849
        self.assertContainsRe(output,
836
850
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
851
 
 
852
    def test_uses_time_from_testtools(self):
 
853
        """Test case timings in verbose results should use testtools times"""
 
854
        import datetime
 
855
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
856
            def startTest(self, test):
 
857
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
858
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
859
            def addSuccess(self, test):
 
860
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
861
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
862
            def report_tests_starting(self): pass
 
863
        sio = StringIO()
 
864
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
865
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
866
 
838
867
    def test_known_failure(self):
839
868
        """A KnownFailure being raised should trigger several result actions."""
840
869
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
870
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
871
            def report_tests_starting(self): pass
844
872
            def report_known_failure(self, test, err=None, details=None):
845
873
                self._call = test, 'known failure'
846
874
        result = InstrumentedTestResult(None, None, None, None)
864
892
        # verbose test output formatting
865
893
        result_stream = StringIO()
866
894
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
895
            result_stream,
868
896
            descriptions=0,
869
897
            verbosity=2,
870
898
            )
880
908
        output = result_stream.getvalue()[prefix:]
881
909
        lines = output.splitlines()
882
910
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
911
        if sys.version_info > (2, 7):
 
912
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
913
                self.assertNotEqual, lines[1], '    ')
883
914
        self.assertEqual(lines[1], '    foo')
884
915
        self.assertEqual(2, len(lines))
885
916
 
893
924
        """Test the behaviour of invoking addNotSupported."""
894
925
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
926
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
927
            def report_tests_starting(self): pass
898
928
            def report_unsupported(self, test, feature):
899
929
                self._call = test, feature
900
930
        result = InstrumentedTestResult(None, None, None, None)
919
949
        # verbose test output formatting
920
950
        result_stream = StringIO()
921
951
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
952
            result_stream,
923
953
            descriptions=0,
924
954
            verbosity=2,
925
955
            )
939
969
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
970
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
971
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
972
            def report_tests_starting(self): pass
944
973
            def addNotSupported(self, test, feature):
945
974
                self._call = test, feature
946
975
        result = InstrumentedTestResult(None, None, None, None)
988
1017
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1018
            calls = 0
990
1019
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1020
        result = InstrumentedTestResult(None, None, None, None)
993
1021
        def test_function():
994
1022
            pass
996
1024
        test.run(result)
997
1025
        self.assertEquals(1, result.calls)
998
1026
 
 
1027
    def test_startTests_only_once(self):
 
1028
        """With multiple tests startTests should still only be called once"""
 
1029
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1030
            calls = 0
 
1031
            def startTests(self): self.calls += 1
 
1032
        result = InstrumentedTestResult(None, None, None, None)
 
1033
        suite = unittest.TestSuite([
 
1034
            unittest.FunctionTestCase(lambda: None),
 
1035
            unittest.FunctionTestCase(lambda: None)])
 
1036
        suite.run(result)
 
1037
        self.assertEquals(1, result.calls)
 
1038
        self.assertEquals(2, result.count)
 
1039
 
999
1040
 
1000
1041
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1042
 
1022
1063
        because of our use of global state.
1023
1064
        """
1024
1065
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1066
        try:
1027
1067
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1068
            return testrunner.run(test)
1030
1069
        finally:
1031
1070
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1071
 
1034
1072
    def test_known_failure_failed_run(self):
1035
1073
        # run a test that generates a known failure which should be printed in
1213
1251
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1252
        self.assertLength(1, self._get_source_tree_calls)
1215
1253
 
 
1254
    def test_verbose_test_count(self):
 
1255
        """A verbose test run reports the right test count at the start"""
 
1256
        suite = TestUtil.TestSuite([
 
1257
            unittest.FunctionTestCase(lambda:None),
 
1258
            unittest.FunctionTestCase(lambda:None)])
 
1259
        self.assertEqual(suite.countTestCases(), 2)
 
1260
        stream = StringIO()
 
1261
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1262
        # Need to use the CountingDecorator as that's what sets num_tests
 
1263
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1264
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1265
 
1216
1266
    def test_startTestRun(self):
1217
1267
        """run should call result.startTestRun()"""
1218
1268
        calls = []
1421
1471
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1472
        output_stream = StringIO()
1423
1473
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1474
            output_stream,
1425
1475
            descriptions=0,
1426
1476
            verbosity=2)
1427
1477
        sample_test.run(result)
1655
1705
        self.assertEqual('original', obj.test_attr)
1656
1706
 
1657
1707
 
 
1708
class _MissingFeature(tests.Feature):
 
1709
    def _probe(self):
 
1710
        return False
 
1711
missing_feature = _MissingFeature()
 
1712
 
 
1713
 
 
1714
def _get_test(name):
 
1715
    """Get an instance of a specific example test.
 
1716
 
 
1717
    We protect this in a function so that they don't auto-run in the test
 
1718
    suite.
 
1719
    """
 
1720
 
 
1721
    class ExampleTests(tests.TestCase):
 
1722
 
 
1723
        def test_fail(self):
 
1724
            mutter('this was a failing test')
 
1725
            self.fail('this test will fail')
 
1726
 
 
1727
        def test_error(self):
 
1728
            mutter('this test errored')
 
1729
            raise RuntimeError('gotcha')
 
1730
 
 
1731
        def test_missing_feature(self):
 
1732
            mutter('missing the feature')
 
1733
            self.requireFeature(missing_feature)
 
1734
 
 
1735
        def test_skip(self):
 
1736
            mutter('this test will be skipped')
 
1737
            raise tests.TestSkipped('reason')
 
1738
 
 
1739
        def test_success(self):
 
1740
            mutter('this test succeeds')
 
1741
 
 
1742
        def test_xfail(self):
 
1743
            mutter('test with expected failure')
 
1744
            self.knownFailure('this_fails')
 
1745
 
 
1746
        def test_unexpected_success(self):
 
1747
            mutter('test with unexpected success')
 
1748
            self.expectFailure('should_fail', lambda: None)
 
1749
 
 
1750
    return ExampleTests(name)
 
1751
 
 
1752
 
 
1753
class TestTestCaseLogDetails(tests.TestCase):
 
1754
 
 
1755
    def _run_test(self, test_name):
 
1756
        test = _get_test(test_name)
 
1757
        result = testtools.TestResult()
 
1758
        test.run(result)
 
1759
        return result
 
1760
 
 
1761
    def test_fail_has_log(self):
 
1762
        result = self._run_test('test_fail')
 
1763
        self.assertEqual(1, len(result.failures))
 
1764
        result_content = result.failures[0][1]
 
1765
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1766
        self.assertContainsRe(result_content, 'this was a failing test')
 
1767
 
 
1768
    def test_error_has_log(self):
 
1769
        result = self._run_test('test_error')
 
1770
        self.assertEqual(1, len(result.errors))
 
1771
        result_content = result.errors[0][1]
 
1772
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1773
        self.assertContainsRe(result_content, 'this test errored')
 
1774
 
 
1775
    def test_skip_has_no_log(self):
 
1776
        result = self._run_test('test_skip')
 
1777
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1778
        skips = result.skip_reasons['reason']
 
1779
        self.assertEqual(1, len(skips))
 
1780
        test = skips[0]
 
1781
        self.assertFalse('log' in test.getDetails())
 
1782
 
 
1783
    def test_missing_feature_has_no_log(self):
 
1784
        # testtools doesn't know about addNotSupported, so it just gets
 
1785
        # considered as a skip
 
1786
        result = self._run_test('test_missing_feature')
 
1787
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1788
        skips = result.skip_reasons[missing_feature]
 
1789
        self.assertEqual(1, len(skips))
 
1790
        test = skips[0]
 
1791
        self.assertFalse('log' in test.getDetails())
 
1792
 
 
1793
    def test_xfail_has_no_log(self):
 
1794
        result = self._run_test('test_xfail')
 
1795
        self.assertEqual(1, len(result.expectedFailures))
 
1796
        result_content = result.expectedFailures[0][1]
 
1797
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1798
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1799
 
 
1800
    def test_unexpected_success_has_log(self):
 
1801
        result = self._run_test('test_unexpected_success')
 
1802
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1803
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1804
        # expectedFailures is a list of reasons?
 
1805
        test = result.unexpectedSuccesses[0]
 
1806
        details = test.getDetails()
 
1807
        self.assertTrue('log' in details)
 
1808
 
 
1809
 
 
1810
class TestTestCloning(tests.TestCase):
 
1811
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1812
 
 
1813
    def test_cloned_testcase_does_not_share_details(self):
 
1814
        """A TestCase cloned with clone_test does not share mutable attributes
 
1815
        such as details or cleanups.
 
1816
        """
 
1817
        class Test(tests.TestCase):
 
1818
            def test_foo(self):
 
1819
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1820
        orig_test = Test('test_foo')
 
1821
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1822
        orig_test.run(unittest.TestResult())
 
1823
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1824
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1825
 
 
1826
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1827
        """Applying two levels of scenarios to a test preserves the attributes
 
1828
        added by both scenarios.
 
1829
        """
 
1830
        class Test(tests.TestCase):
 
1831
            def test_foo(self):
 
1832
                pass
 
1833
        test = Test('test_foo')
 
1834
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1835
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1836
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1837
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1838
        all_tests = list(tests.iter_suite_tests(suite))
 
1839
        self.assertLength(4, all_tests)
 
1840
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1841
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1842
 
 
1843
 
1658
1844
# NB: Don't delete this; it's not actually from 0.11!
1659
1845
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1846
def sample_deprecated_function():
1814
2000
                tree.branch.repository.bzrdir.root_transport)
1815
2001
 
1816
2002
 
1817
 
class SelfTestHelper:
 
2003
class SelfTestHelper(object):
1818
2004
 
1819
2005
    def run_selftest(self, **kwargs):
1820
2006
        """Run selftest returning its output."""
1971
2157
            load_list='missing file name', list_only=True)
1972
2158
 
1973
2159
 
 
2160
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2161
 
 
2162
    _test_needs_features = [features.subunit]
 
2163
 
 
2164
    def run_subunit_stream(self, test_name):
 
2165
        from subunit import ProtocolTestCase
 
2166
        def factory():
 
2167
            return TestUtil.TestSuite([_get_test(test_name)])
 
2168
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2169
            test_suite_factory=factory)
 
2170
        test = ProtocolTestCase(stream)
 
2171
        result = testtools.TestResult()
 
2172
        test.run(result)
 
2173
        content = stream.getvalue()
 
2174
        return content, result
 
2175
 
 
2176
    def test_fail_has_log(self):
 
2177
        content, result = self.run_subunit_stream('test_fail')
 
2178
        self.assertEqual(1, len(result.failures))
 
2179
        self.assertContainsRe(content, '(?m)^log$')
 
2180
        self.assertContainsRe(content, 'this test will fail')
 
2181
 
 
2182
    def test_error_has_log(self):
 
2183
        content, result = self.run_subunit_stream('test_error')
 
2184
        self.assertContainsRe(content, '(?m)^log$')
 
2185
        self.assertContainsRe(content, 'this test errored')
 
2186
 
 
2187
    def test_skip_has_no_log(self):
 
2188
        content, result = self.run_subunit_stream('test_skip')
 
2189
        self.assertNotContainsRe(content, '(?m)^log$')
 
2190
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2191
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2192
        skips = result.skip_reasons['reason']
 
2193
        self.assertEqual(1, len(skips))
 
2194
        test = skips[0]
 
2195
        # RemotedTestCase doesn't preserve the "details"
 
2196
        ## self.assertFalse('log' in test.getDetails())
 
2197
 
 
2198
    def test_missing_feature_has_no_log(self):
 
2199
        content, result = self.run_subunit_stream('test_missing_feature')
 
2200
        self.assertNotContainsRe(content, '(?m)^log$')
 
2201
        self.assertNotContainsRe(content, 'missing the feature')
 
2202
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2203
        skips = result.skip_reasons['_MissingFeature\n']
 
2204
        self.assertEqual(1, len(skips))
 
2205
        test = skips[0]
 
2206
        # RemotedTestCase doesn't preserve the "details"
 
2207
        ## self.assertFalse('log' in test.getDetails())
 
2208
 
 
2209
    def test_xfail_has_no_log(self):
 
2210
        content, result = self.run_subunit_stream('test_xfail')
 
2211
        self.assertNotContainsRe(content, '(?m)^log$')
 
2212
        self.assertNotContainsRe(content, 'test with expected failure')
 
2213
        self.assertEqual(1, len(result.expectedFailures))
 
2214
        result_content = result.expectedFailures[0][1]
 
2215
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2216
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2217
 
 
2218
    def test_unexpected_success_has_log(self):
 
2219
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2220
        self.assertContainsRe(content, '(?m)^log$')
 
2221
        self.assertContainsRe(content, 'test with unexpected success')
 
2222
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2223
                           ' as a plain success',
 
2224
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2225
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2226
        test = result.unexpectedSuccesses[0]
 
2227
        # RemotedTestCase doesn't preserve the "details"
 
2228
        ## self.assertTrue('log' in test.getDetails())
 
2229
 
 
2230
    def test_success_has_no_log(self):
 
2231
        content, result = self.run_subunit_stream('test_success')
 
2232
        self.assertEqual(1, result.testsRun)
 
2233
        self.assertNotContainsRe(content, '(?m)^log$')
 
2234
        self.assertNotContainsRe(content, 'this test succeeds')
 
2235
 
 
2236
 
1974
2237
class TestRunBzr(tests.TestCase):
1975
2238
 
1976
2239
    out = ''
2339
2602
            os.chdir = orig_chdir
2340
2603
        self.assertEqual(['foo', 'current'], chdirs)
2341
2604
 
 
2605
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2606
        self.get_source_path = lambda: ""
 
2607
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2608
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2609
 
2342
2610
 
2343
2611
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2612
    """Tests that really need to do things with an external bzr."""
2960
3228
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3229
 
2962
3230
 
 
3231
class TestThreadLeakDetection(tests.TestCase):
 
3232
    """Ensure when tests leak threads we detect and report it"""
 
3233
 
 
3234
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3235
        def __init__(self):
 
3236
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3237
            self.leaks = []
 
3238
        def _report_thread_leak(self, test, leaks, alive):
 
3239
            self.leaks.append((test, leaks))
 
3240
 
 
3241
    def test_testcase_without_addCleanups(self):
 
3242
        """Check old TestCase instances don't break with leak detection"""
 
3243
        class Test(unittest.TestCase):
 
3244
            def runTest(self):
 
3245
                pass
 
3246
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3247
        result = self.LeakRecordingResult()
 
3248
        test = Test()
 
3249
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3250
        result.startTestRun()
 
3251
        test.run(result)
 
3252
        result.stopTestRun()
 
3253
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3254
        self.assertEqual(result.leaks, [])
 
3255
        
 
3256
    def test_thread_leak(self):
 
3257
        """Ensure a thread that outlives the running of a test is reported
 
3258
 
 
3259
        Uses a thread that blocks on an event, and is started by the inner
 
3260
        test case. As the thread outlives the inner case's run, it should be
 
3261
        detected as a leak, but the event is then set so that the thread can
 
3262
        be safely joined in cleanup so it's not leaked for real.
 
3263
        """
 
3264
        event = threading.Event()
 
3265
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3266
        class Test(tests.TestCase):
 
3267
            def test_leak(self):
 
3268
                thread.start()
 
3269
        result = self.LeakRecordingResult()
 
3270
        test = Test("test_leak")
 
3271
        self.addCleanup(thread.join)
 
3272
        self.addCleanup(event.set)
 
3273
        result.startTestRun()
 
3274
        test.run(result)
 
3275
        result.stopTestRun()
 
3276
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3277
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3278
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3279
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3280
 
 
3281
    def test_multiple_leaks(self):
 
3282
        """Check multiple leaks are blamed on the test cases at fault
 
3283
 
 
3284
        Same concept as the previous test, but has one inner test method that
 
3285
        leaks two threads, and one that doesn't leak at all.
 
3286
        """
 
3287
        event = threading.Event()
 
3288
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3289
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3290
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3291
        class Test(tests.TestCase):
 
3292
            def test_first_leak(self):
 
3293
                thread_b.start()
 
3294
            def test_second_no_leak(self):
 
3295
                pass
 
3296
            def test_third_leak(self):
 
3297
                thread_c.start()
 
3298
                thread_a.start()
 
3299
        result = self.LeakRecordingResult()
 
3300
        first_test = Test("test_first_leak")
 
3301
        third_test = Test("test_third_leak")
 
3302
        self.addCleanup(thread_a.join)
 
3303
        self.addCleanup(thread_b.join)
 
3304
        self.addCleanup(thread_c.join)
 
3305
        self.addCleanup(event.set)
 
3306
        result.startTestRun()
 
3307
        unittest.TestSuite(
 
3308
            [first_test, Test("test_second_no_leak"), third_test]
 
3309
            ).run(result)
 
3310
        result.stopTestRun()
 
3311
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3312
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3313
        self.assertEqual(result.leaks, [
 
3314
            (first_test, set([thread_b])),
 
3315
            (third_test, set([thread_a, thread_c]))])
 
3316
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3317
 
 
3318
 
2963
3319
class TestRunSuite(tests.TestCase):
2964
3320
 
2965
3321
    def test_runner_class(self):