/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2010-10-15 10:12:12 UTC
  • mto: This revision was merged to the branch mainline in revision 5503.
  • Revision ID: mbp@sourcefrog.net-20101015101212-sg5jki3sl7nz97oe
Remove redundant news entry about never-happened 1.17.1.

The irregular name in this was causing accidental problems in 'make doc-website'.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
42
47
    lockdir,
43
48
    memorytree,
44
49
    osutils,
45
 
    progress,
46
50
    remote,
47
51
    repository,
48
52
    symbol_versioning,
67
71
    test_sftp_transport,
68
72
    TestUtil,
69
73
    )
70
 
from bzrlib.trace import note
 
74
from bzrlib.trace import note, mutter
71
75
from bzrlib.transport import memory
72
76
from bzrlib.version import _get_bzr_source_tree
73
77
 
122
126
        self.failUnlessExists(filename)
123
127
 
124
128
 
 
129
class TestClassesAvailable(tests.TestCase):
 
130
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
131
 
 
132
    def test_test_case(self):
 
133
        from bzrlib.tests import TestCase
 
134
 
 
135
    def test_test_loader(self):
 
136
        from bzrlib.tests import TestLoader
 
137
 
 
138
    def test_test_suite(self):
 
139
        from bzrlib.tests import TestSuite
 
140
 
 
141
 
125
142
class TestTransportScenarios(tests.TestCase):
126
143
    """A group of tests that test the transport implementation adaption core.
127
144
 
208
225
    def test_scenarios(self):
209
226
        # check that constructor parameters are passed through to the adapted
210
227
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
228
        from bzrlib.tests.per_controldir import make_scenarios
212
229
        vfs_factory = "v"
213
230
        server1 = "a"
214
231
        server2 = "b"
312
329
        from bzrlib.tests.per_interrepository import make_scenarios
313
330
        server1 = "a"
314
331
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
332
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
333
        scenarios = make_scenarios(server1, server2, formats)
317
334
        self.assertEqual([
318
335
            ('C0,str,str',
319
336
             {'repository_format': 'C1',
320
337
              'repository_format_to': 'C2',
321
338
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
339
              'transport_server': 'a',
 
340
              'extra_setup': 'C3'}),
323
341
            ('D0,str,str',
324
342
             {'repository_format': 'D1',
325
343
              'repository_format_to': 'D2',
326
344
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
345
              'transport_server': 'a',
 
346
              'extra_setup': 'D3'})],
328
347
            scenarios)
329
348
 
330
349
 
609
628
                l.attempt_lock()
610
629
        test = TestDanglingLock('test_function')
611
630
        result = test.run()
 
631
        total_failures = result.errors + result.failures
612
632
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
633
            self.assertEqual(1, len(total_failures))
614
634
        else:
615
635
            # When _lock_check_thorough is disabled, then we don't trigger a
616
636
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
637
            self.assertEqual(0, len(total_failures))
618
638
 
619
639
 
620
640
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
641
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
642
 
623
643
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
644
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
645
        self.vfs_transport_factory = memory.MemoryServer
627
646
        self.transport_readonly_server = None
629
648
        # for the server
630
649
        url = self.get_readonly_url()
631
650
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
651
        t = transport.get_transport(url)
 
652
        t2 = transport.get_transport(url2)
634
653
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
654
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
655
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
656
 
638
657
    def test_get_readonly_url_http(self):
639
658
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
659
        from bzrlib.transport.http import HttpTransportBase
642
660
        self.transport_server = test_server.LocalURLServer
643
661
        self.transport_readonly_server = HttpServer
645
663
        url = self.get_readonly_url()
646
664
        url2 = self.get_readonly_url('foo/bar')
647
665
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
666
        t = transport.get_transport(url)
 
667
        t2 = transport.get_transport(url2)
650
668
        self.failUnless(isinstance(t, HttpTransportBase))
651
669
        self.failUnless(isinstance(t2, HttpTransportBase))
652
670
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
708
class TestChrootedTest(tests.ChrootedTestCase):
691
709
 
692
710
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
711
        t = transport.get_transport(self.get_readonly_url())
695
712
        url = t.base
696
713
        self.assertEqual(url, t.clone('..').base)
697
714
 
803
820
        self.requireFeature(test_lsprof.LSProfFeature)
804
821
        result_stream = StringIO()
805
822
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
823
            result_stream,
807
824
            descriptions=0,
808
825
            verbosity=2,
809
826
            )
835
852
        self.assertContainsRe(output,
836
853
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
854
 
 
855
    def test_uses_time_from_testtools(self):
 
856
        """Test case timings in verbose results should use testtools times"""
 
857
        import datetime
 
858
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
859
            def startTest(self, test):
 
860
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
861
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
862
            def addSuccess(self, test):
 
863
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
864
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
865
            def report_tests_starting(self): pass
 
866
        sio = StringIO()
 
867
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
868
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
869
 
838
870
    def test_known_failure(self):
839
871
        """A KnownFailure being raised should trigger several result actions."""
840
872
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
873
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
874
            def report_tests_starting(self): pass
844
875
            def report_known_failure(self, test, err=None, details=None):
845
876
                self._call = test, 'known failure'
846
877
        result = InstrumentedTestResult(None, None, None, None)
864
895
        # verbose test output formatting
865
896
        result_stream = StringIO()
866
897
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
898
            result_stream,
868
899
            descriptions=0,
869
900
            verbosity=2,
870
901
            )
880
911
        output = result_stream.getvalue()[prefix:]
881
912
        lines = output.splitlines()
882
913
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
914
        if sys.version_info > (2, 7):
 
915
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
916
                self.assertNotEqual, lines[1], '    ')
883
917
        self.assertEqual(lines[1], '    foo')
884
918
        self.assertEqual(2, len(lines))
885
919
 
893
927
        """Test the behaviour of invoking addNotSupported."""
894
928
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
929
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
930
            def report_tests_starting(self): pass
898
931
            def report_unsupported(self, test, feature):
899
932
                self._call = test, feature
900
933
        result = InstrumentedTestResult(None, None, None, None)
919
952
        # verbose test output formatting
920
953
        result_stream = StringIO()
921
954
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
955
            result_stream,
923
956
            descriptions=0,
924
957
            verbosity=2,
925
958
            )
939
972
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
973
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
974
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
975
            def report_tests_starting(self): pass
944
976
            def addNotSupported(self, test, feature):
945
977
                self._call = test, feature
946
978
        result = InstrumentedTestResult(None, None, None, None)
988
1020
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1021
            calls = 0
990
1022
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1023
        result = InstrumentedTestResult(None, None, None, None)
993
1024
        def test_function():
994
1025
            pass
996
1027
        test.run(result)
997
1028
        self.assertEquals(1, result.calls)
998
1029
 
 
1030
    def test_startTests_only_once(self):
 
1031
        """With multiple tests startTests should still only be called once"""
 
1032
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1033
            calls = 0
 
1034
            def startTests(self): self.calls += 1
 
1035
        result = InstrumentedTestResult(None, None, None, None)
 
1036
        suite = unittest.TestSuite([
 
1037
            unittest.FunctionTestCase(lambda: None),
 
1038
            unittest.FunctionTestCase(lambda: None)])
 
1039
        suite.run(result)
 
1040
        self.assertEquals(1, result.calls)
 
1041
        self.assertEquals(2, result.count)
 
1042
 
999
1043
 
1000
1044
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1045
 
1022
1066
        because of our use of global state.
1023
1067
        """
1024
1068
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1069
        try:
1027
1070
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1071
            return testrunner.run(test)
1030
1072
        finally:
1031
1073
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1074
 
1034
1075
    def test_known_failure_failed_run(self):
1035
1076
        # run a test that generates a known failure which should be printed in
1081
1122
    def test_result_decorator(self):
1082
1123
        # decorate results
1083
1124
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1125
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1126
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1127
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1128
                calls.append('start')
1088
1129
        test = unittest.FunctionTestCase(lambda:None)
1089
1130
        stream = StringIO()
1213
1254
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1255
        self.assertLength(1, self._get_source_tree_calls)
1215
1256
 
 
1257
    def test_verbose_test_count(self):
 
1258
        """A verbose test run reports the right test count at the start"""
 
1259
        suite = TestUtil.TestSuite([
 
1260
            unittest.FunctionTestCase(lambda:None),
 
1261
            unittest.FunctionTestCase(lambda:None)])
 
1262
        self.assertEqual(suite.countTestCases(), 2)
 
1263
        stream = StringIO()
 
1264
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1265
        # Need to use the CountingDecorator as that's what sets num_tests
 
1266
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1267
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1268
 
1216
1269
    def test_startTestRun(self):
1217
1270
        """run should call result.startTestRun()"""
1218
1271
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1272
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1273
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1274
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1275
                calls.append('startTestRun')
1223
1276
        test = unittest.FunctionTestCase(lambda:None)
1224
1277
        stream = StringIO()
1230
1283
    def test_stopTestRun(self):
1231
1284
        """run should call result.stopTestRun()"""
1232
1285
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1286
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1287
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1288
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1289
                calls.append('stopTestRun')
1237
1290
        test = unittest.FunctionTestCase(lambda:None)
1238
1291
        stream = StringIO()
1421
1474
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1475
        output_stream = StringIO()
1423
1476
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1477
            output_stream,
1425
1478
            descriptions=0,
1426
1479
            verbosity=2)
1427
1480
        sample_test.run(result)
1655
1708
        self.assertEqual('original', obj.test_attr)
1656
1709
 
1657
1710
 
 
1711
class _MissingFeature(tests.Feature):
 
1712
    def _probe(self):
 
1713
        return False
 
1714
missing_feature = _MissingFeature()
 
1715
 
 
1716
 
 
1717
def _get_test(name):
 
1718
    """Get an instance of a specific example test.
 
1719
 
 
1720
    We protect this in a function so that they don't auto-run in the test
 
1721
    suite.
 
1722
    """
 
1723
 
 
1724
    class ExampleTests(tests.TestCase):
 
1725
 
 
1726
        def test_fail(self):
 
1727
            mutter('this was a failing test')
 
1728
            self.fail('this test will fail')
 
1729
 
 
1730
        def test_error(self):
 
1731
            mutter('this test errored')
 
1732
            raise RuntimeError('gotcha')
 
1733
 
 
1734
        def test_missing_feature(self):
 
1735
            mutter('missing the feature')
 
1736
            self.requireFeature(missing_feature)
 
1737
 
 
1738
        def test_skip(self):
 
1739
            mutter('this test will be skipped')
 
1740
            raise tests.TestSkipped('reason')
 
1741
 
 
1742
        def test_success(self):
 
1743
            mutter('this test succeeds')
 
1744
 
 
1745
        def test_xfail(self):
 
1746
            mutter('test with expected failure')
 
1747
            self.knownFailure('this_fails')
 
1748
 
 
1749
        def test_unexpected_success(self):
 
1750
            mutter('test with unexpected success')
 
1751
            self.expectFailure('should_fail', lambda: None)
 
1752
 
 
1753
    return ExampleTests(name)
 
1754
 
 
1755
 
 
1756
class TestTestCaseLogDetails(tests.TestCase):
 
1757
 
 
1758
    def _run_test(self, test_name):
 
1759
        test = _get_test(test_name)
 
1760
        result = testtools.TestResult()
 
1761
        test.run(result)
 
1762
        return result
 
1763
 
 
1764
    def test_fail_has_log(self):
 
1765
        result = self._run_test('test_fail')
 
1766
        self.assertEqual(1, len(result.failures))
 
1767
        result_content = result.failures[0][1]
 
1768
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1769
        self.assertContainsRe(result_content, 'this was a failing test')
 
1770
 
 
1771
    def test_error_has_log(self):
 
1772
        result = self._run_test('test_error')
 
1773
        self.assertEqual(1, len(result.errors))
 
1774
        result_content = result.errors[0][1]
 
1775
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1776
        self.assertContainsRe(result_content, 'this test errored')
 
1777
 
 
1778
    def test_skip_has_no_log(self):
 
1779
        result = self._run_test('test_skip')
 
1780
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1781
        skips = result.skip_reasons['reason']
 
1782
        self.assertEqual(1, len(skips))
 
1783
        test = skips[0]
 
1784
        self.assertFalse('log' in test.getDetails())
 
1785
 
 
1786
    def test_missing_feature_has_no_log(self):
 
1787
        # testtools doesn't know about addNotSupported, so it just gets
 
1788
        # considered as a skip
 
1789
        result = self._run_test('test_missing_feature')
 
1790
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1791
        skips = result.skip_reasons[missing_feature]
 
1792
        self.assertEqual(1, len(skips))
 
1793
        test = skips[0]
 
1794
        self.assertFalse('log' in test.getDetails())
 
1795
 
 
1796
    def test_xfail_has_no_log(self):
 
1797
        result = self._run_test('test_xfail')
 
1798
        self.assertEqual(1, len(result.expectedFailures))
 
1799
        result_content = result.expectedFailures[0][1]
 
1800
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1801
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1802
 
 
1803
    def test_unexpected_success_has_log(self):
 
1804
        result = self._run_test('test_unexpected_success')
 
1805
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1806
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1807
        # expectedFailures is a list of reasons?
 
1808
        test = result.unexpectedSuccesses[0]
 
1809
        details = test.getDetails()
 
1810
        self.assertTrue('log' in details)
 
1811
 
 
1812
 
 
1813
class TestTestCloning(tests.TestCase):
 
1814
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1815
 
 
1816
    def test_cloned_testcase_does_not_share_details(self):
 
1817
        """A TestCase cloned with clone_test does not share mutable attributes
 
1818
        such as details or cleanups.
 
1819
        """
 
1820
        class Test(tests.TestCase):
 
1821
            def test_foo(self):
 
1822
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1823
        orig_test = Test('test_foo')
 
1824
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1825
        orig_test.run(unittest.TestResult())
 
1826
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1827
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1828
 
 
1829
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1830
        """Applying two levels of scenarios to a test preserves the attributes
 
1831
        added by both scenarios.
 
1832
        """
 
1833
        class Test(tests.TestCase):
 
1834
            def test_foo(self):
 
1835
                pass
 
1836
        test = Test('test_foo')
 
1837
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1838
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1839
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1840
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1841
        all_tests = list(tests.iter_suite_tests(suite))
 
1842
        self.assertLength(4, all_tests)
 
1843
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1844
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1845
 
 
1846
 
1658
1847
# NB: Don't delete this; it's not actually from 0.11!
1659
1848
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1849
def sample_deprecated_function():
1814
2003
                tree.branch.repository.bzrdir.root_transport)
1815
2004
 
1816
2005
 
1817
 
class SelfTestHelper:
 
2006
class SelfTestHelper(object):
1818
2007
 
1819
2008
    def run_selftest(self, **kwargs):
1820
2009
        """Run selftest returning its output."""
1880
2069
            def __call__(test, result):
1881
2070
                test.run(result)
1882
2071
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2072
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2073
                calls.append("called")
1885
2074
            def countTestCases(self):
1886
2075
                return 1
1971
2160
            load_list='missing file name', list_only=True)
1972
2161
 
1973
2162
 
 
2163
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2164
 
 
2165
    _test_needs_features = [features.subunit]
 
2166
 
 
2167
    def run_subunit_stream(self, test_name):
 
2168
        from subunit import ProtocolTestCase
 
2169
        def factory():
 
2170
            return TestUtil.TestSuite([_get_test(test_name)])
 
2171
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2172
            test_suite_factory=factory)
 
2173
        test = ProtocolTestCase(stream)
 
2174
        result = testtools.TestResult()
 
2175
        test.run(result)
 
2176
        content = stream.getvalue()
 
2177
        return content, result
 
2178
 
 
2179
    def test_fail_has_log(self):
 
2180
        content, result = self.run_subunit_stream('test_fail')
 
2181
        self.assertEqual(1, len(result.failures))
 
2182
        self.assertContainsRe(content, '(?m)^log$')
 
2183
        self.assertContainsRe(content, 'this test will fail')
 
2184
 
 
2185
    def test_error_has_log(self):
 
2186
        content, result = self.run_subunit_stream('test_error')
 
2187
        self.assertContainsRe(content, '(?m)^log$')
 
2188
        self.assertContainsRe(content, 'this test errored')
 
2189
 
 
2190
    def test_skip_has_no_log(self):
 
2191
        content, result = self.run_subunit_stream('test_skip')
 
2192
        self.assertNotContainsRe(content, '(?m)^log$')
 
2193
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2194
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2195
        skips = result.skip_reasons['reason']
 
2196
        self.assertEqual(1, len(skips))
 
2197
        test = skips[0]
 
2198
        # RemotedTestCase doesn't preserve the "details"
 
2199
        ## self.assertFalse('log' in test.getDetails())
 
2200
 
 
2201
    def test_missing_feature_has_no_log(self):
 
2202
        content, result = self.run_subunit_stream('test_missing_feature')
 
2203
        self.assertNotContainsRe(content, '(?m)^log$')
 
2204
        self.assertNotContainsRe(content, 'missing the feature')
 
2205
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2206
        skips = result.skip_reasons['_MissingFeature\n']
 
2207
        self.assertEqual(1, len(skips))
 
2208
        test = skips[0]
 
2209
        # RemotedTestCase doesn't preserve the "details"
 
2210
        ## self.assertFalse('log' in test.getDetails())
 
2211
 
 
2212
    def test_xfail_has_no_log(self):
 
2213
        content, result = self.run_subunit_stream('test_xfail')
 
2214
        self.assertNotContainsRe(content, '(?m)^log$')
 
2215
        self.assertNotContainsRe(content, 'test with expected failure')
 
2216
        self.assertEqual(1, len(result.expectedFailures))
 
2217
        result_content = result.expectedFailures[0][1]
 
2218
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2219
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2220
 
 
2221
    def test_unexpected_success_has_log(self):
 
2222
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2223
        self.assertContainsRe(content, '(?m)^log$')
 
2224
        self.assertContainsRe(content, 'test with unexpected success')
 
2225
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2226
                           ' as a plain success',
 
2227
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2228
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2229
        test = result.unexpectedSuccesses[0]
 
2230
        # RemotedTestCase doesn't preserve the "details"
 
2231
        ## self.assertTrue('log' in test.getDetails())
 
2232
 
 
2233
    def test_success_has_no_log(self):
 
2234
        content, result = self.run_subunit_stream('test_success')
 
2235
        self.assertEqual(1, result.testsRun)
 
2236
        self.assertNotContainsRe(content, '(?m)^log$')
 
2237
        self.assertNotContainsRe(content, 'this test succeeds')
 
2238
 
 
2239
 
1974
2240
class TestRunBzr(tests.TestCase):
1975
2241
 
1976
2242
    out = ''
2339
2605
            os.chdir = orig_chdir
2340
2606
        self.assertEqual(['foo', 'current'], chdirs)
2341
2607
 
 
2608
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2609
        self.get_source_path = lambda: ""
 
2610
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2611
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2612
 
2342
2613
 
2343
2614
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2615
    """Tests that really need to do things with an external bzr."""
2960
3231
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3232
 
2962
3233
 
 
3234
class TestThreadLeakDetection(tests.TestCase):
 
3235
    """Ensure when tests leak threads we detect and report it"""
 
3236
 
 
3237
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3238
        def __init__(self):
 
3239
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3240
            self.leaks = []
 
3241
        def _report_thread_leak(self, test, leaks, alive):
 
3242
            self.leaks.append((test, leaks))
 
3243
 
 
3244
    def test_testcase_without_addCleanups(self):
 
3245
        """Check old TestCase instances don't break with leak detection"""
 
3246
        class Test(unittest.TestCase):
 
3247
            def runTest(self):
 
3248
                pass
 
3249
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3250
        result = self.LeakRecordingResult()
 
3251
        test = Test()
 
3252
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3253
        result.startTestRun()
 
3254
        test.run(result)
 
3255
        result.stopTestRun()
 
3256
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3257
        self.assertEqual(result.leaks, [])
 
3258
        
 
3259
    def test_thread_leak(self):
 
3260
        """Ensure a thread that outlives the running of a test is reported
 
3261
 
 
3262
        Uses a thread that blocks on an event, and is started by the inner
 
3263
        test case. As the thread outlives the inner case's run, it should be
 
3264
        detected as a leak, but the event is then set so that the thread can
 
3265
        be safely joined in cleanup so it's not leaked for real.
 
3266
        """
 
3267
        event = threading.Event()
 
3268
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3269
        class Test(tests.TestCase):
 
3270
            def test_leak(self):
 
3271
                thread.start()
 
3272
        result = self.LeakRecordingResult()
 
3273
        test = Test("test_leak")
 
3274
        self.addCleanup(thread.join)
 
3275
        self.addCleanup(event.set)
 
3276
        result.startTestRun()
 
3277
        test.run(result)
 
3278
        result.stopTestRun()
 
3279
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3280
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3281
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3282
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3283
 
 
3284
    def test_multiple_leaks(self):
 
3285
        """Check multiple leaks are blamed on the test cases at fault
 
3286
 
 
3287
        Same concept as the previous test, but has one inner test method that
 
3288
        leaks two threads, and one that doesn't leak at all.
 
3289
        """
 
3290
        event = threading.Event()
 
3291
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3292
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3293
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3294
        class Test(tests.TestCase):
 
3295
            def test_first_leak(self):
 
3296
                thread_b.start()
 
3297
            def test_second_no_leak(self):
 
3298
                pass
 
3299
            def test_third_leak(self):
 
3300
                thread_c.start()
 
3301
                thread_a.start()
 
3302
        result = self.LeakRecordingResult()
 
3303
        first_test = Test("test_first_leak")
 
3304
        third_test = Test("test_third_leak")
 
3305
        self.addCleanup(thread_a.join)
 
3306
        self.addCleanup(thread_b.join)
 
3307
        self.addCleanup(thread_c.join)
 
3308
        self.addCleanup(event.set)
 
3309
        result.startTestRun()
 
3310
        unittest.TestSuite(
 
3311
            [first_test, Test("test_second_no_leak"), third_test]
 
3312
            ).run(result)
 
3313
        result.stopTestRun()
 
3314
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3315
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3316
        self.assertEqual(result.leaks, [
 
3317
            (first_test, set([thread_b])),
 
3318
            (third_test, set([thread_a, thread_c]))])
 
3319
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3320
 
 
3321
 
 
3322
class TestPostMortemDebugging(tests.TestCase):
 
3323
    """Check post mortem debugging works when tests fail or error"""
 
3324
 
 
3325
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3326
        def __init__(self):
 
3327
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3328
            self.postcode = None
 
3329
        def _post_mortem(self, tb=None):
 
3330
            """Record the code object at the end of the current traceback"""
 
3331
            tb = tb or sys.exc_info()[2]
 
3332
            if tb is not None:
 
3333
                next = tb.tb_next
 
3334
                while next is not None:
 
3335
                    tb = next
 
3336
                    next = next.tb_next
 
3337
                self.postcode = tb.tb_frame.f_code
 
3338
        def report_error(self, test, err):
 
3339
            pass
 
3340
        def report_failure(self, test, err):
 
3341
            pass
 
3342
 
 
3343
    def test_location_unittest_error(self):
 
3344
        """Needs right post mortem traceback with erroring unittest case"""
 
3345
        class Test(unittest.TestCase):
 
3346
            def runTest(self):
 
3347
                raise RuntimeError
 
3348
        result = self.TracebackRecordingResult()
 
3349
        Test().run(result)
 
3350
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3351
 
 
3352
    def test_location_unittest_failure(self):
 
3353
        """Needs right post mortem traceback with failing unittest case"""
 
3354
        class Test(unittest.TestCase):
 
3355
            def runTest(self):
 
3356
                raise self.failureException
 
3357
        result = self.TracebackRecordingResult()
 
3358
        Test().run(result)
 
3359
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3360
 
 
3361
    def test_location_bt_error(self):
 
3362
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3363
        class Test(tests.TestCase):
 
3364
            def test_error(self):
 
3365
                raise RuntimeError
 
3366
        result = self.TracebackRecordingResult()
 
3367
        Test("test_error").run(result)
 
3368
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3369
 
 
3370
    def test_location_bt_failure(self):
 
3371
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3372
        class Test(tests.TestCase):
 
3373
            def test_failure(self):
 
3374
                raise self.failureException
 
3375
        result = self.TracebackRecordingResult()
 
3376
        Test("test_failure").run(result)
 
3377
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3378
 
 
3379
    def test_env_var_triggers_post_mortem(self):
 
3380
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3381
        import pdb
 
3382
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3383
        post_mortem_calls = []
 
3384
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3385
        self.addCleanup(osutils.set_or_unset_env, "BZR_TEST_PDB",
 
3386
            osutils.set_or_unset_env("BZR_TEST_PDB", None))
 
3387
        result._post_mortem(1)
 
3388
        os.environ["BZR_TEST_PDB"] = "on"
 
3389
        result._post_mortem(2)
 
3390
        self.assertEqual([2], post_mortem_calls)
 
3391
 
 
3392
 
2963
3393
class TestRunSuite(tests.TestCase):
2964
3394
 
2965
3395
    def test_runner_class(self):