595
626
test = TestDanglingLock('test_function')
596
627
result = test.run()
628
total_failures = result.errors + result.failures
597
629
if self._lock_check_thorough:
598
self.assertEqual(1, len(result.errors))
630
self.assertEqual(1, len(total_failures))
600
632
# When _lock_check_thorough is disabled, then we don't trigger a
602
self.assertEqual(0, len(result.errors))
634
self.assertEqual(0, len(total_failures))
605
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
606
638
"""Tests for the convenience functions TestCaseWithTransport introduces."""
608
640
def test_get_readonly_url_none(self):
609
from bzrlib.transport import get_transport
610
from bzrlib.transport.memory import MemoryServer
611
641
from bzrlib.transport.readonly import ReadonlyTransportDecorator
612
self.vfs_transport_factory = MemoryServer
642
self.vfs_transport_factory = memory.MemoryServer
613
643
self.transport_readonly_server = None
614
644
# calling get_readonly_transport() constructs a decorator on the url
616
646
url = self.get_readonly_url()
617
647
url2 = self.get_readonly_url('foo/bar')
618
t = get_transport(url)
619
t2 = get_transport(url2)
648
t = transport.get_transport(url)
649
t2 = transport.get_transport(url2)
620
650
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
621
651
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
622
652
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
624
654
def test_get_readonly_url_http(self):
625
655
from bzrlib.tests.http_server import HttpServer
626
from bzrlib.transport import get_transport
627
from bzrlib.transport.local import LocalURLServer
628
656
from bzrlib.transport.http import HttpTransportBase
629
self.transport_server = LocalURLServer
657
self.transport_server = test_server.LocalURLServer
630
658
self.transport_readonly_server = HttpServer
631
659
# calling get_readonly_transport() gives us a HTTP server instance.
632
660
url = self.get_readonly_url()
633
661
url2 = self.get_readonly_url('foo/bar')
634
662
# the transport returned may be any HttpTransportBase subclass
635
t = get_transport(url)
636
t2 = get_transport(url2)
663
t = transport.get_transport(url)
664
t2 = transport.get_transport(url2)
637
665
self.failUnless(isinstance(t, HttpTransportBase))
638
666
self.failUnless(isinstance(t2, HttpTransportBase))
639
667
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
1014
1048
because of our use of global state.
1016
1050
old_root = tests.TestCaseInTempDir.TEST_ROOT
1017
old_leak = tests.TestCase._first_thread_leaker_id
1019
1052
tests.TestCaseInTempDir.TEST_ROOT = None
1020
tests.TestCase._first_thread_leaker_id = None
1021
1053
return testrunner.run(test)
1023
1055
tests.TestCaseInTempDir.TEST_ROOT = old_root
1024
tests.TestCase._first_thread_leaker_id = old_leak
1026
1057
def test_known_failure_failed_run(self):
1027
1058
# run a test that generates a known failure which should be printed in
1028
1059
# the final output when real failures occur.
1029
1060
class Test(tests.TestCase):
1030
1061
def known_failure_test(self):
1031
raise tests.KnownFailure('failed')
1062
self.expectFailure('failed', self.assertTrue, False)
1032
1063
test = unittest.TestSuite()
1033
1064
test.addTest(Test("known_failure_test"))
1034
1065
def failing_test():
1035
raise AssertionError('foo')
1036
1067
test.addTest(unittest.FunctionTestCase(failing_test))
1037
1068
stream = StringIO()
1038
1069
runner = tests.TextTestRunner(stream=stream)
1039
1070
result = self.run_test_runner(runner, test)
1040
1071
lines = stream.getvalue().splitlines()
1041
1072
self.assertContainsRe(stream.getvalue(),
1073
'(?sm)^bzr selftest.*$'
1044
1075
'^======================================================================\n'
1045
'^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
1076
'^FAIL: failing_test\n'
1046
1077
'^----------------------------------------------------------------------\n'
1047
1078
'Traceback \\(most recent call last\\):\n'
1048
1079
' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1049
' raise AssertionError\\(\'foo\'\\)\n'
1080
' self.fail\\(\'foo\'\\)\n'
1051
1082
'^----------------------------------------------------------------------\n'
1209
1236
self.assertContainsRe(output_string, "--date [0-9.]+")
1210
1237
self.assertLength(1, self._get_source_tree_calls)
1212
def assertLogDeleted(self, test):
1213
log = test._get_log()
1214
self.assertEqual("DELETED log file to reduce memory footprint", log)
1215
self.assertEqual('', test._log_contents)
1216
self.assertIs(None, test._log_file_name)
1218
def test_success_log_deleted(self):
1219
"""Successful tests have their log deleted"""
1221
class LogTester(tests.TestCase):
1223
def test_success(self):
1224
self.log('this will be removed\n')
1227
runner = tests.TextTestRunner(stream=sio)
1228
test = LogTester('test_success')
1229
result = self.run_test_runner(runner, test)
1231
self.assertLogDeleted(test)
1233
def test_skipped_log_deleted(self):
1234
"""Skipped tests have their log deleted"""
1236
class LogTester(tests.TestCase):
1238
def test_skipped(self):
1239
self.log('this will be removed\n')
1240
raise tests.TestSkipped()
1243
runner = tests.TextTestRunner(stream=sio)
1244
test = LogTester('test_skipped')
1245
result = self.run_test_runner(runner, test)
1247
self.assertLogDeleted(test)
1249
def test_not_aplicable_log_deleted(self):
1250
"""Not applicable tests have their log deleted"""
1252
class LogTester(tests.TestCase):
1254
def test_not_applicable(self):
1255
self.log('this will be removed\n')
1256
raise tests.TestNotApplicable()
1259
runner = tests.TextTestRunner(stream=sio)
1260
test = LogTester('test_not_applicable')
1261
result = self.run_test_runner(runner, test)
1263
self.assertLogDeleted(test)
1265
def test_known_failure_log_deleted(self):
1266
"""Know failure tests have their log deleted"""
1268
class LogTester(tests.TestCase):
1270
def test_known_failure(self):
1271
self.log('this will be removed\n')
1272
raise tests.KnownFailure()
1275
runner = tests.TextTestRunner(stream=sio)
1276
test = LogTester('test_known_failure')
1277
result = self.run_test_runner(runner, test)
1279
self.assertLogDeleted(test)
1281
def test_fail_log_kept(self):
1282
"""Failed tests have their log kept"""
1284
class LogTester(tests.TestCase):
1286
def test_fail(self):
1287
self.log('this will be kept\n')
1288
self.fail('this test fails')
1291
runner = tests.TextTestRunner(stream=sio)
1292
test = LogTester('test_fail')
1293
result = self.run_test_runner(runner, test)
1295
text = sio.getvalue()
1296
self.assertContainsRe(text, 'this will be kept')
1297
self.assertContainsRe(text, 'this test fails')
1299
log = test._get_log()
1300
self.assertContainsRe(log, 'this will be kept')
1301
self.assertEqual(log, test._log_contents)
1303
def test_error_log_kept(self):
1304
"""Tests with errors have their log kept"""
1306
class LogTester(tests.TestCase):
1308
def test_error(self):
1309
self.log('this will be kept\n')
1310
raise ValueError('random exception raised')
1313
runner = tests.TextTestRunner(stream=sio)
1314
test = LogTester('test_error')
1315
result = self.run_test_runner(runner, test)
1317
text = sio.getvalue()
1318
self.assertContainsRe(text, 'this will be kept')
1319
self.assertContainsRe(text, 'random exception raised')
1321
log = test._get_log()
1322
self.assertContainsRe(log, 'this will be kept')
1323
self.assertEqual(log, test._log_contents)
1239
def test_verbose_test_count(self):
1240
"""A verbose test run reports the right test count at the start"""
1241
suite = TestUtil.TestSuite([
1242
unittest.FunctionTestCase(lambda:None),
1243
unittest.FunctionTestCase(lambda:None)])
1244
self.assertEqual(suite.countTestCases(), 2)
1246
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
# Need to use the CountingDecorator as that's what sets num_tests
1248
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1325
1251
def test_startTestRun(self):
1326
1252
"""run should call result.startTestRun()"""
1728
1653
self.assertRaises(AssertionError,
1729
1654
self.assertListRaises, _TestException, success_generator)
1656
def test_overrideAttr_without_value(self):
1657
self.test_attr = 'original' # Define a test attribute
1658
obj = self # Make 'obj' visible to the embedded test
1659
class Test(tests.TestCase):
1662
tests.TestCase.setUp(self)
1663
self.orig = self.overrideAttr(obj, 'test_attr')
1665
def test_value(self):
1666
self.assertEqual('original', self.orig)
1667
self.assertEqual('original', obj.test_attr)
1668
obj.test_attr = 'modified'
1669
self.assertEqual('modified', obj.test_attr)
1671
test = Test('test_value')
1672
test.run(unittest.TestResult())
1673
self.assertEqual('original', obj.test_attr)
1675
def test_overrideAttr_with_value(self):
1676
self.test_attr = 'original' # Define a test attribute
1677
obj = self # Make 'obj' visible to the embedded test
1678
class Test(tests.TestCase):
1681
tests.TestCase.setUp(self)
1682
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1684
def test_value(self):
1685
self.assertEqual('original', self.orig)
1686
self.assertEqual('modified', obj.test_attr)
1688
test = Test('test_value')
1689
test.run(unittest.TestResult())
1690
self.assertEqual('original', obj.test_attr)
1693
class _MissingFeature(tests.Feature):
1696
missing_feature = _MissingFeature()
1699
def _get_test(name):
1700
"""Get an instance of a specific example test.
1702
We protect this in a function so that they don't auto-run in the test
1706
class ExampleTests(tests.TestCase):
1708
def test_fail(self):
1709
mutter('this was a failing test')
1710
self.fail('this test will fail')
1712
def test_error(self):
1713
mutter('this test errored')
1714
raise RuntimeError('gotcha')
1716
def test_missing_feature(self):
1717
mutter('missing the feature')
1718
self.requireFeature(missing_feature)
1720
def test_skip(self):
1721
mutter('this test will be skipped')
1722
raise tests.TestSkipped('reason')
1724
def test_success(self):
1725
mutter('this test succeeds')
1727
def test_xfail(self):
1728
mutter('test with expected failure')
1729
self.knownFailure('this_fails')
1731
def test_unexpected_success(self):
1732
mutter('test with unexpected success')
1733
self.expectFailure('should_fail', lambda: None)
1735
return ExampleTests(name)
1738
class TestTestCaseLogDetails(tests.TestCase):
1740
def _run_test(self, test_name):
1741
test = _get_test(test_name)
1742
result = testtools.TestResult()
1746
def test_fail_has_log(self):
1747
result = self._run_test('test_fail')
1748
self.assertEqual(1, len(result.failures))
1749
result_content = result.failures[0][1]
1750
self.assertContainsRe(result_content, 'Text attachment: log')
1751
self.assertContainsRe(result_content, 'this was a failing test')
1753
def test_error_has_log(self):
1754
result = self._run_test('test_error')
1755
self.assertEqual(1, len(result.errors))
1756
result_content = result.errors[0][1]
1757
self.assertContainsRe(result_content, 'Text attachment: log')
1758
self.assertContainsRe(result_content, 'this test errored')
1760
def test_skip_has_no_log(self):
1761
result = self._run_test('test_skip')
1762
self.assertEqual(['reason'], result.skip_reasons.keys())
1763
skips = result.skip_reasons['reason']
1764
self.assertEqual(1, len(skips))
1766
self.assertFalse('log' in test.getDetails())
1768
def test_missing_feature_has_no_log(self):
1769
# testtools doesn't know about addNotSupported, so it just gets
1770
# considered as a skip
1771
result = self._run_test('test_missing_feature')
1772
self.assertEqual([missing_feature], result.skip_reasons.keys())
1773
skips = result.skip_reasons[missing_feature]
1774
self.assertEqual(1, len(skips))
1776
self.assertFalse('log' in test.getDetails())
1778
def test_xfail_has_no_log(self):
1779
result = self._run_test('test_xfail')
1780
self.assertEqual(1, len(result.expectedFailures))
1781
result_content = result.expectedFailures[0][1]
1782
self.assertNotContainsRe(result_content, 'Text attachment: log')
1783
self.assertNotContainsRe(result_content, 'test with expected failure')
1785
def test_unexpected_success_has_log(self):
1786
result = self._run_test('test_unexpected_success')
1787
self.assertEqual(1, len(result.unexpectedSuccesses))
1788
# Inconsistency, unexpectedSuccesses is a list of tests,
1789
# expectedFailures is a list of reasons?
1790
test = result.unexpectedSuccesses[0]
1791
details = test.getDetails()
1792
self.assertTrue('log' in details)
1795
class TestTestCloning(tests.TestCase):
1796
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1798
def test_cloned_testcase_does_not_share_details(self):
1799
"""A TestCase cloned with clone_test does not share mutable attributes
1800
such as details or cleanups.
1802
class Test(tests.TestCase):
1804
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
orig_test = Test('test_foo')
1806
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
orig_test.run(unittest.TestResult())
1808
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1811
def test_double_apply_scenario_preserves_first_scenario(self):
1812
"""Applying two levels of scenarios to a test preserves the attributes
1813
added by both scenarios.
1815
class Test(tests.TestCase):
1818
test = Test('test_foo')
1819
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
all_tests = list(tests.iter_suite_tests(suite))
1824
self.assertLength(4, all_tests)
1825
all_xys = sorted((t.x, t.y) for t in all_tests)
1826
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1732
1829
# NB: Don't delete this; it's not actually from 0.11!
1733
1830
@deprecated_function(deprecated_in((0, 11, 0)))
2048
2142
load_list='missing file name', list_only=True)
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2147
_test_needs_features = [features.subunit]
2149
def run_subunit_stream(self, test_name):
2150
from subunit import ProtocolTestCase
2152
return TestUtil.TestSuite([_get_test(test_name)])
2153
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2154
test_suite_factory=factory)
2155
test = ProtocolTestCase(stream)
2156
result = testtools.TestResult()
2158
content = stream.getvalue()
2159
return content, result
2161
def test_fail_has_log(self):
2162
content, result = self.run_subunit_stream('test_fail')
2163
self.assertEqual(1, len(result.failures))
2164
self.assertContainsRe(content, '(?m)^log$')
2165
self.assertContainsRe(content, 'this test will fail')
2167
def test_error_has_log(self):
2168
content, result = self.run_subunit_stream('test_error')
2169
self.assertContainsRe(content, '(?m)^log$')
2170
self.assertContainsRe(content, 'this test errored')
2172
def test_skip_has_no_log(self):
2173
content, result = self.run_subunit_stream('test_skip')
2174
self.assertNotContainsRe(content, '(?m)^log$')
2175
self.assertNotContainsRe(content, 'this test will be skipped')
2176
self.assertEqual(['reason'], result.skip_reasons.keys())
2177
skips = result.skip_reasons['reason']
2178
self.assertEqual(1, len(skips))
2180
# RemotedTestCase doesn't preserve the "details"
2181
## self.assertFalse('log' in test.getDetails())
2183
def test_missing_feature_has_no_log(self):
2184
content, result = self.run_subunit_stream('test_missing_feature')
2185
self.assertNotContainsRe(content, '(?m)^log$')
2186
self.assertNotContainsRe(content, 'missing the feature')
2187
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2188
skips = result.skip_reasons['_MissingFeature\n']
2189
self.assertEqual(1, len(skips))
2191
# RemotedTestCase doesn't preserve the "details"
2192
## self.assertFalse('log' in test.getDetails())
2194
def test_xfail_has_no_log(self):
2195
content, result = self.run_subunit_stream('test_xfail')
2196
self.assertNotContainsRe(content, '(?m)^log$')
2197
self.assertNotContainsRe(content, 'test with expected failure')
2198
self.assertEqual(1, len(result.expectedFailures))
2199
result_content = result.expectedFailures[0][1]
2200
self.assertNotContainsRe(result_content, 'Text attachment: log')
2201
self.assertNotContainsRe(result_content, 'test with expected failure')
2203
def test_unexpected_success_has_log(self):
2204
content, result = self.run_subunit_stream('test_unexpected_success')
2205
self.assertContainsRe(content, '(?m)^log$')
2206
self.assertContainsRe(content, 'test with unexpected success')
2207
self.expectFailure('subunit treats "unexpectedSuccess"'
2208
' as a plain success',
2209
self.assertEqual, 1, len(result.unexpectedSuccesses))
2210
self.assertEqual(1, len(result.unexpectedSuccesses))
2211
test = result.unexpectedSuccesses[0]
2212
# RemotedTestCase doesn't preserve the "details"
2213
## self.assertTrue('log' in test.getDetails())
2215
def test_success_has_no_log(self):
2216
content, result = self.run_subunit_stream('test_success')
2217
self.assertEqual(1, result.testsRun)
2218
self.assertNotContainsRe(content, '(?m)^log$')
2219
self.assertNotContainsRe(content, 'this test succeeds')
2051
2222
class TestRunBzr(tests.TestCase):
2868
3042
# test doubles that supply a few sample tests to load, and check they
2871
def _test_suite_testmod_names():
3045
def testmod_names():
2872
3046
calls.append("testmod_names")
2874
3048
'bzrlib.tests.blackbox.test_branch',
2875
3049
'bzrlib.tests.per_transport',
2876
3050
'bzrlib.tests.test_selftest',
2878
original_testmod_names = tests._test_suite_testmod_names
2879
def _test_suite_modules_to_doctest():
3052
self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
2880
3054
calls.append("modules_to_doctest")
2881
3057
return ['bzrlib.timestamp']
2882
orig_modules_to_doctest = tests._test_suite_modules_to_doctest
2883
def restore_names():
2884
tests._test_suite_testmod_names = original_testmod_names
2885
tests._test_suite_modules_to_doctest = orig_modules_to_doctest
2886
self.addCleanup(restore_names)
2887
tests._test_suite_testmod_names = _test_suite_testmod_names
2888
tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
3058
self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2889
3059
expected_test_list = [
2890
3060
# testmod_names
2891
3061
'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2892
3062
('bzrlib.tests.per_transport.TransportTests'
2893
3063
'.test_abspath(LocalTransport,LocalURLServer)'),
2894
3064
'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2895
# modules_to_doctest
2896
'bzrlib.timestamp.format_highres_date',
2897
3065
# plugins can't be tested that way since selftest may be run with
3068
if __doc__ is not None:
3069
expected_test_list.extend([
3070
# modules_to_doctest
3071
'bzrlib.timestamp.format_highres_date',
2900
3073
suite = tests.test_suite()
2901
3074
self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
3042
3213
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3216
class TestThreadLeakDetection(tests.TestCase):
3217
"""Ensure when tests leak threads we detect and report it"""
3219
class LeakRecordingResult(tests.ExtendedTestResult):
3221
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3223
def _report_thread_leak(self, test, leaks, alive):
3224
self.leaks.append((test, leaks))
3226
def test_testcase_without_addCleanups(self):
3227
"""Check old TestCase instances don't break with leak detection"""
3228
class Test(unittest.TestCase):
3231
addCleanup = None # for when on Python 2.7 with native addCleanup
3232
result = self.LeakRecordingResult()
3234
self.assertIs(getattr(test, "addCleanup", None), None)
3235
result.startTestRun()
3237
result.stopTestRun()
3238
self.assertEqual(result._tests_leaking_threads_count, 0)
3239
self.assertEqual(result.leaks, [])
3241
def test_thread_leak(self):
3242
"""Ensure a thread that outlives the running of a test is reported
3244
Uses a thread that blocks on an event, and is started by the inner
3245
test case. As the thread outlives the inner case's run, it should be
3246
detected as a leak, but the event is then set so that the thread can
3247
be safely joined in cleanup so it's not leaked for real.
3249
event = threading.Event()
3250
thread = threading.Thread(name="Leaker", target=event.wait)
3251
class Test(tests.TestCase):
3252
def test_leak(self):
3254
result = self.LeakRecordingResult()
3255
test = Test("test_leak")
3256
self.addCleanup(thread.join)
3257
self.addCleanup(event.set)
3258
result.startTestRun()
3260
result.stopTestRun()
3261
self.assertEqual(result._tests_leaking_threads_count, 1)
3262
self.assertEqual(result._first_thread_leaker_id, test.id())
3263
self.assertEqual(result.leaks, [(test, set([thread]))])
3264
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3266
def test_multiple_leaks(self):
3267
"""Check multiple leaks are blamed on the test cases at fault
3269
Same concept as the previous test, but has one inner test method that
3270
leaks two threads, and one that doesn't leak at all.
3272
event = threading.Event()
3273
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
class Test(tests.TestCase):
3277
def test_first_leak(self):
3279
def test_second_no_leak(self):
3281
def test_third_leak(self):
3284
result = self.LeakRecordingResult()
3285
first_test = Test("test_first_leak")
3286
third_test = Test("test_third_leak")
3287
self.addCleanup(thread_a.join)
3288
self.addCleanup(thread_b.join)
3289
self.addCleanup(thread_c.join)
3290
self.addCleanup(event.set)
3291
result.startTestRun()
3293
[first_test, Test("test_second_no_leak"), third_test]
3295
result.stopTestRun()
3296
self.assertEqual(result._tests_leaking_threads_count, 2)
3297
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
self.assertEqual(result.leaks, [
3299
(first_test, set([thread_b])),
3300
(third_test, set([thread_a, thread_c]))])
3301
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3045
3304
class TestRunSuite(tests.TestCase):
3047
3306
def test_runner_class(self):