603
641
def test_dangling_locks_cause_failures(self):
604
642
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
643
def test_function(self):
606
t = self.get_transport('.')
644
t = self.get_transport_from_path('.')
607
645
l = lockdir.LockDir(t, 'lock')
610
648
test = TestDanglingLock('test_function')
611
649
result = test.run()
650
total_failures = result.errors + result.failures
612
651
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
652
self.assertEqual(1, len(total_failures))
615
654
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
656
self.assertEqual(0, len(total_failures))
620
659
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
660
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
662
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
663
from ..transport.readonly import ReadonlyTransportDecorator
626
664
self.vfs_transport_factory = memory.MemoryServer
627
665
self.transport_readonly_server = None
628
666
# calling get_readonly_transport() constructs a decorator on the url
630
668
url = self.get_readonly_url()
631
669
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
670
t = transport.get_transport_from_url(url)
671
t2 = transport.get_transport_from_url(url2)
672
self.assertIsInstance(t, ReadonlyTransportDecorator)
673
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
674
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
676
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
677
from .http_server import HttpServer
678
from ..transport.http import HttpTransportBase
642
679
self.transport_server = test_server.LocalURLServer
643
680
self.transport_readonly_server = HttpServer
644
681
# calling get_readonly_transport() gives us a HTTP server instance.
645
682
url = self.get_readonly_url()
646
683
url2 = self.get_readonly_url('foo/bar')
647
684
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
685
t = transport.get_transport_from_url(url)
686
t2 = transport.get_transport_from_url(url2)
687
self.assertIsInstance(t, HttpTransportBase)
688
self.assertIsInstance(t2, HttpTransportBase)
652
689
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
691
def test_is_directory(self):
1643
1669
class Test(tests.TestCase):
1645
1671
def setUp(self):
1646
tests.TestCase.setUp(self)
1672
super(Test, self).setUp()
1647
1673
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1675
def test_value(self):
1650
1676
self.assertEqual('original', self.orig)
1651
1677
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1679
self._run_successful_test(Test('test_value'))
1655
1680
self.assertEqual('original', obj.test_attr)
1682
def test_overrideAttr_with_no_existing_value_and_value(self):
1683
# Do not define the test_attribute
1684
obj = self # Make 'obj' visible to the embedded test
1685
class Test(tests.TestCase):
1688
tests.TestCase.setUp(self)
1689
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1691
def test_value(self):
1692
self.assertEqual(tests._unitialized_attr, self.orig)
1693
self.assertEqual('modified', obj.test_attr)
1695
self._run_successful_test(Test('test_value'))
1696
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1698
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1699
# Do not define the test_attribute
1700
obj = self # Make 'obj' visible to the embedded test
1701
class Test(tests.TestCase):
1704
tests.TestCase.setUp(self)
1705
self.orig = self.overrideAttr(obj, 'test_attr')
1707
def test_value(self):
1708
self.assertEqual(tests._unitialized_attr, self.orig)
1709
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1711
self._run_successful_test(Test('test_value'))
1712
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1714
def test_recordCalls(self):
1715
from breezy.tests import test_selftest
1716
calls = self.recordCalls(
1717
test_selftest, '_add_numbers')
1718
self.assertEqual(test_selftest._add_numbers(2, 10),
1720
self.assertEqual(calls, [((2, 10), {})])
1723
def _add_numbers(a, b):
1727
class _MissingFeature(features.Feature):
1730
missing_feature = _MissingFeature()
1733
def _get_test(name):
1734
"""Get an instance of a specific example test.
1736
We protect this in a function so that they don't auto-run in the test
1740
class ExampleTests(tests.TestCase):
1742
def test_fail(self):
1743
mutter('this was a failing test')
1744
self.fail('this test will fail')
1746
def test_error(self):
1747
mutter('this test errored')
1748
raise RuntimeError('gotcha')
1750
def test_missing_feature(self):
1751
mutter('missing the feature')
1752
self.requireFeature(missing_feature)
1754
def test_skip(self):
1755
mutter('this test will be skipped')
1756
raise tests.TestSkipped('reason')
1758
def test_success(self):
1759
mutter('this test succeeds')
1761
def test_xfail(self):
1762
mutter('test with expected failure')
1763
self.knownFailure('this_fails')
1765
def test_unexpected_success(self):
1766
mutter('test with unexpected success')
1767
self.expectFailure('should_fail', lambda: None)
1769
return ExampleTests(name)
1772
def _get_skip_reasons(result):
1773
# GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
1774
return result.skip_reasons
1777
class TestTestCaseLogDetails(tests.TestCase):
1779
def _run_test(self, test_name):
1780
test = _get_test(test_name)
1781
result = testtools.TestResult()
1785
def test_fail_has_log(self):
1786
result = self._run_test('test_fail')
1787
self.assertEqual(1, len(result.failures))
1788
result_content = result.failures[0][1]
1789
self.assertContainsRe(result_content,
1790
'(?m)^(?:Text attachment: )?log(?:$|: )')
1791
self.assertContainsRe(result_content, 'this was a failing test')
1793
def test_error_has_log(self):
1794
result = self._run_test('test_error')
1795
self.assertEqual(1, len(result.errors))
1796
result_content = result.errors[0][1]
1797
self.assertContainsRe(result_content,
1798
'(?m)^(?:Text attachment: )?log(?:$|: )')
1799
self.assertContainsRe(result_content, 'this test errored')
1801
def test_skip_has_no_log(self):
1802
result = self._run_test('test_skip')
1803
reasons = _get_skip_reasons(result)
1804
self.assertEqual({'reason'}, set(reasons))
1805
skips = reasons['reason']
1806
self.assertEqual(1, len(skips))
1808
self.assertFalse('log' in test.getDetails())
1810
def test_missing_feature_has_no_log(self):
1811
# testtools doesn't know about addNotSupported, so it just gets
1812
# considered as a skip
1813
result = self._run_test('test_missing_feature')
1814
reasons = _get_skip_reasons(result)
1815
self.assertEqual({missing_feature}, set(reasons))
1816
skips = reasons[missing_feature]
1817
self.assertEqual(1, len(skips))
1819
self.assertFalse('log' in test.getDetails())
1821
def test_xfail_has_no_log(self):
1822
result = self._run_test('test_xfail')
1823
self.assertEqual(1, len(result.expectedFailures))
1824
result_content = result.expectedFailures[0][1]
1825
self.assertNotContainsRe(result_content,
1826
'(?m)^(?:Text attachment: )?log(?:$|: )')
1827
self.assertNotContainsRe(result_content, 'test with expected failure')
1829
def test_unexpected_success_has_log(self):
1830
result = self._run_test('test_unexpected_success')
1831
self.assertEqual(1, len(result.unexpectedSuccesses))
1832
# Inconsistency, unexpectedSuccesses is a list of tests,
1833
# expectedFailures is a list of reasons?
1834
test = result.unexpectedSuccesses[0]
1835
details = test.getDetails()
1836
self.assertTrue('log' in details)
1839
class TestTestCloning(tests.TestCase):
1840
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1842
def test_cloned_testcase_does_not_share_details(self):
1843
"""A TestCase cloned with clone_test does not share mutable attributes
1844
such as details or cleanups.
1846
class Test(tests.TestCase):
1848
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1849
orig_test = Test('test_foo')
1850
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1851
orig_test.run(unittest.TestResult())
1852
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1853
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1855
def test_double_apply_scenario_preserves_first_scenario(self):
1856
"""Applying two levels of scenarios to a test preserves the attributes
1857
added by both scenarios.
1859
class Test(tests.TestCase):
1862
test = Test('test_foo')
1863
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1864
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1865
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1866
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1867
all_tests = list(tests.iter_suite_tests(suite))
1868
self.assertLength(4, all_tests)
1869
all_xys = sorted((t.x, t.y) for t in all_tests)
1870
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1873
# NB: Don't delete this; it's not actually from 0.11!
1659
1874
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2183
load_list='missing file name', list_only=True)
2186
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2188
_test_needs_features = [features.subunit]
2190
def run_subunit_stream(self, test_name):
2191
from subunit import ProtocolTestCase
2193
return TestUtil.TestSuite([_get_test(test_name)])
2194
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2195
test_suite_factory=factory)
2196
test = ProtocolTestCase(stream)
2197
result = testtools.TestResult()
2199
content = stream.getvalue()
2200
return content, result
2202
def test_fail_has_log(self):
2203
content, result = self.run_subunit_stream('test_fail')
2204
self.assertEqual(1, len(result.failures))
2205
self.assertContainsRe(content, '(?m)^log$')
2206
self.assertContainsRe(content, 'this test will fail')
2208
def test_error_has_log(self):
2209
content, result = self.run_subunit_stream('test_error')
2210
self.assertContainsRe(content, '(?m)^log$')
2211
self.assertContainsRe(content, 'this test errored')
2213
def test_skip_has_no_log(self):
2214
content, result = self.run_subunit_stream('test_skip')
2215
self.assertNotContainsRe(content, '(?m)^log$')
2216
self.assertNotContainsRe(content, 'this test will be skipped')
2217
reasons = _get_skip_reasons(result)
2218
self.assertEqual({'reason'}, set(reasons))
2219
skips = reasons['reason']
2220
self.assertEqual(1, len(skips))
2222
# RemotedTestCase doesn't preserve the "details"
2223
## self.assertFalse('log' in test.getDetails())
2225
def test_missing_feature_has_no_log(self):
2226
content, result = self.run_subunit_stream('test_missing_feature')
2227
self.assertNotContainsRe(content, '(?m)^log$')
2228
self.assertNotContainsRe(content, 'missing the feature')
2229
reasons = _get_skip_reasons(result)
2230
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2231
skips = reasons['_MissingFeature\n']
2232
self.assertEqual(1, len(skips))
2234
# RemotedTestCase doesn't preserve the "details"
2235
## self.assertFalse('log' in test.getDetails())
2237
def test_xfail_has_no_log(self):
2238
content, result = self.run_subunit_stream('test_xfail')
2239
self.assertNotContainsRe(content, '(?m)^log$')
2240
self.assertNotContainsRe(content, 'test with expected failure')
2241
self.assertEqual(1, len(result.expectedFailures))
2242
result_content = result.expectedFailures[0][1]
2243
self.assertNotContainsRe(result_content,
2244
'(?m)^(?:Text attachment: )?log(?:$|: )')
2245
self.assertNotContainsRe(result_content, 'test with expected failure')
2247
def test_unexpected_success_has_log(self):
2248
content, result = self.run_subunit_stream('test_unexpected_success')
2249
self.assertContainsRe(content, '(?m)^log$')
2250
self.assertContainsRe(content, 'test with unexpected success')
2251
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2252
# success, if a min version check is added remove this
2253
from subunit import TestProtocolClient as _Client
2254
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2255
self.expectFailure('subunit treats "unexpectedSuccess"'
2256
' as a plain success',
2257
self.assertEqual, 1, len(result.unexpectedSuccesses))
2258
self.assertEqual(1, len(result.unexpectedSuccesses))
2259
test = result.unexpectedSuccesses[0]
2260
# RemotedTestCase doesn't preserve the "details"
2261
## self.assertTrue('log' in test.getDetails())
2263
def test_success_has_no_log(self):
2264
content, result = self.run_subunit_stream('test_success')
2265
self.assertEqual(1, result.testsRun)
2266
self.assertNotContainsRe(content, '(?m)^log$')
2267
self.assertNotContainsRe(content, 'this test succeeds')
1974
2270
class TestRunBzr(tests.TestCase):
2264
2560
class TestStartBzrSubProcess(tests.TestCase):
2561
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2563
def _subprocess_log_cleanup(self):
2564
"""Inhibits the base version as we don't produce a log file."""
2269
2566
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2567
"""Override the base version to record the command that is run.
2569
From there we can ensure it is correct without spawning a real process.
2271
2571
self.check_popen_state()
2272
2572
self._popen_args = args
2273
2573
self._popen_kwargs = kwargs
2274
2574
raise _DontSpawnProcess()
2576
def check_popen_state(self):
2577
"""Replace to make assertions when popen is called."""
2276
2579
def test_run_bzr_subprocess_no_plugins(self):
2277
2580
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2581
command = self._popen_args[0]
2279
2582
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2583
self.assertEqual(self.get_brz_path(), command[1])
2281
2584
self.assertEqual(['--no-plugins'], command[2:])
2283
2586
def test_allow_plugins(self):
2284
2587
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2589
command = self._popen_args[0]
2287
2590
self.assertEqual([], command[2:])
2289
2592
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2593
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2594
# set in the child
2292
2595
def check_environment():
2293
2596
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2597
self.check_popen_state = check_environment
2295
2598
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2599
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2600
# not set in theparent
2298
2601
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2603
def test_run_bzr_subprocess_env_del(self):
2301
2604
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2605
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2606
def check_environment():
2304
2607
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2608
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2609
self.check_popen_state = check_environment
2307
2610
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2611
env_changes={'EXISTANT_ENV_VAR':None})
2309
2612
# Still set in parent
2310
2613
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2614
del os.environ['EXISTANT_ENV_VAR']
2313
2616
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2617
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2618
def check_environment():
2316
2619
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2620
self.check_popen_state = check_environment
2318
2621
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2622
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2624
def test_working_dir(self):
2322
2625
"""Test that we can specify the working dir for the child"""
2354
2656
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
2658
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2659
self.assertEqual('brz: interrupted\n', result[1])
2434
2662
class TestSelftestFiltering(tests.TestCase):
2436
2664
def setUp(self):
2437
tests.TestCase.setUp(self)
2665
super(TestSelftestFiltering, self).setUp()
2438
2666
self.suite = TestUtil.TestSuite()
2439
2667
self.loader = TestUtil.TestLoader()
2440
2668
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2669
sys.modules['breezy.tests.test_selftest']))
2442
2670
self.all_names = _test_ids(self.suite)
2444
2672
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2673
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2674
'test_condition_id_re')
2447
2675
filtered_suite = tests.filter_suite_by_condition(
2448
2676
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2677
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2679
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2680
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2681
'test_condition_id_in_list']
2454
2682
id_list = tests.TestIdList(test_names)
2455
2683
filtered_suite = tests.filter_suite_by_condition(
2953
3183
def test_predefined_prefixes(self):
2954
3184
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3185
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3186
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3187
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3188
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3189
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3190
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3193
class TestThreadLeakDetection(tests.TestCase):
3194
"""Ensure when tests leak threads we detect and report it"""
3196
class LeakRecordingResult(tests.ExtendedTestResult):
3198
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3200
def _report_thread_leak(self, test, leaks, alive):
3201
self.leaks.append((test, leaks))
3203
def test_testcase_without_addCleanups(self):
3204
"""Check old TestCase instances don't break with leak detection"""
3205
class Test(unittest.TestCase):
3208
result = self.LeakRecordingResult()
3210
result.startTestRun()
3212
result.stopTestRun()
3213
self.assertEqual(result._tests_leaking_threads_count, 0)
3214
self.assertEqual(result.leaks, [])
3216
def test_thread_leak(self):
3217
"""Ensure a thread that outlives the running of a test is reported
3219
Uses a thread that blocks on an event, and is started by the inner
3220
test case. As the thread outlives the inner case's run, it should be
3221
detected as a leak, but the event is then set so that the thread can
3222
be safely joined in cleanup so it's not leaked for real.
3224
event = threading.Event()
3225
thread = threading.Thread(name="Leaker", target=event.wait)
3226
class Test(tests.TestCase):
3227
def test_leak(self):
3229
result = self.LeakRecordingResult()
3230
test = Test("test_leak")
3231
self.addCleanup(thread.join)
3232
self.addCleanup(event.set)
3233
result.startTestRun()
3235
result.stopTestRun()
3236
self.assertEqual(result._tests_leaking_threads_count, 1)
3237
self.assertEqual(result._first_thread_leaker_id, test.id())
3238
self.assertEqual(result.leaks, [(test, {thread})])
3239
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3241
def test_multiple_leaks(self):
3242
"""Check multiple leaks are blamed on the test cases at fault
3244
Same concept as the previous test, but has one inner test method that
3245
leaks two threads, and one that doesn't leak at all.
3247
event = threading.Event()
3248
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3249
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3250
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3251
class Test(tests.TestCase):
3252
def test_first_leak(self):
3254
def test_second_no_leak(self):
3256
def test_third_leak(self):
3259
result = self.LeakRecordingResult()
3260
first_test = Test("test_first_leak")
3261
third_test = Test("test_third_leak")
3262
self.addCleanup(thread_a.join)
3263
self.addCleanup(thread_b.join)
3264
self.addCleanup(thread_c.join)
3265
self.addCleanup(event.set)
3266
result.startTestRun()
3268
[first_test, Test("test_second_no_leak"), third_test]
3270
result.stopTestRun()
3271
self.assertEqual(result._tests_leaking_threads_count, 2)
3272
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3273
self.assertEqual(result.leaks, [
3274
(first_test, {thread_b}),
3275
(third_test, {thread_a, thread_c})])
3276
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3279
class TestPostMortemDebugging(tests.TestCase):
3280
"""Check post mortem debugging works when tests fail or error"""
3282
class TracebackRecordingResult(tests.ExtendedTestResult):
3284
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3285
self.postcode = None
3286
def _post_mortem(self, tb=None):
3287
"""Record the code object at the end of the current traceback"""
3288
tb = tb or sys.exc_info()[2]
3291
while next is not None:
3294
self.postcode = tb.tb_frame.f_code
3295
def report_error(self, test, err):
3297
def report_failure(self, test, err):
3300
def test_location_unittest_error(self):
3301
"""Needs right post mortem traceback with erroring unittest case"""
3302
class Test(unittest.TestCase):
3305
result = self.TracebackRecordingResult()
3307
self.assertEqual(result.postcode, Test.runTest.__code__)
3309
def test_location_unittest_failure(self):
3310
"""Needs right post mortem traceback with failing unittest case"""
3311
class Test(unittest.TestCase):
3313
raise self.failureException
3314
result = self.TracebackRecordingResult()
3316
self.assertEqual(result.postcode, Test.runTest.__code__)
3318
def test_location_bt_error(self):
3319
"""Needs right post mortem traceback with erroring breezy.tests case"""
3320
class Test(tests.TestCase):
3321
def test_error(self):
3323
result = self.TracebackRecordingResult()
3324
Test("test_error").run(result)
3325
self.assertEqual(result.postcode, Test.test_error.__code__)
3327
def test_location_bt_failure(self):
3328
"""Needs right post mortem traceback with failing breezy.tests case"""
3329
class Test(tests.TestCase):
3330
def test_failure(self):
3331
raise self.failureException
3332
result = self.TracebackRecordingResult()
3333
Test("test_failure").run(result)
3334
self.assertEqual(result.postcode, Test.test_failure.__code__)
3336
def test_env_var_triggers_post_mortem(self):
3337
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3339
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3340
post_mortem_calls = []
3341
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3342
self.overrideEnv('BRZ_TEST_PDB', None)
3343
result._post_mortem(1)
3344
self.overrideEnv('BRZ_TEST_PDB', 'on')
3345
result._post_mortem(2)
3346
self.assertEqual([2], post_mortem_calls)
2963
3349
class TestRunSuite(tests.TestCase):
2976
3362
self.verbosity)
2977
3363
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3364
self.assertLength(1, calls)
3367
class _Selftest(object):
3368
"""Mixin for tests needing full selftest output"""
3370
def _inject_stream_into_subunit(self, stream):
3371
"""To be overridden by subclasses that run tests out of process"""
3373
def _run_selftest(self, **kwargs):
3375
self._inject_stream_into_subunit(sio)
3376
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3377
return sio.getvalue()
3380
class _ForkedSelftest(_Selftest):
3381
"""Mixin for tests needing full selftest output with forked children"""
3383
_test_needs_features = [features.subunit]
3385
def _inject_stream_into_subunit(self, stream):
3386
"""Monkey-patch subunit so the extra output goes to stream not stdout
3388
Some APIs need rewriting so this kind of bogus hackery can be replaced
3389
by passing the stream param from run_tests down into ProtocolTestCase.
3391
from subunit import ProtocolTestCase
3392
_original_init = ProtocolTestCase.__init__
3393
def _init_with_passthrough(self, *args, **kwargs):
3394
_original_init(self, *args, **kwargs)
3395
self._passthrough = stream
3396
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3398
def _run_selftest(self, **kwargs):
3399
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3400
if getattr(os, "fork", None) is None:
3401
raise tests.TestNotApplicable("Platform doesn't support forking")
3402
# Make sure the fork code is actually invoked by claiming two cores
3403
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3404
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3405
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3408
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3409
"""Check operation of --parallel=fork selftest option"""
3411
def test_error_in_child_during_fork(self):
3412
"""Error in a forked child during test setup should get reported"""
3413
class Test(tests.TestCase):
3414
def testMethod(self):
3416
# We don't care what, just break something that a child will run
3417
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3418
out = self._run_selftest(test_suite_factory=Test)
3419
# Lines from the tracebacks of the two child processes may be mixed
3420
# together due to the way subunit parses and forwards the streams,
3421
# so permit extra lines between each part of the error output.
3422
self.assertContainsRe(out,
3425
".+ in fork_for_tests\n"
3427
"\s*workaround_zealous_crypto_random\(\)\n"
3432
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3433
"""Check a test case still alive after being run emits a warning"""
3435
class Test(tests.TestCase):
3436
def test_pass(self):
3438
def test_self_ref(self):
3439
self.also_self = self.test_self_ref
3440
def test_skip(self):
3441
self.skipTest("Don't need")
3443
def _get_suite(self):
3444
return TestUtil.TestSuite([
3445
self.Test("test_pass"),
3446
self.Test("test_self_ref"),
3447
self.Test("test_skip"),
3450
def _run_selftest_with_suite(self, **kwargs):
3451
old_flags = tests.selftest_debug_flags
3452
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3453
gc_on = gc.isenabled()
3457
output = self._run_selftest(test_suite_factory=self._get_suite,
3462
tests.selftest_debug_flags = old_flags
3463
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3464
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3467
def test_testsuite(self):
3468
self._run_selftest_with_suite()
3470
def test_pattern(self):
3471
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3472
self.assertNotContainsRe(out, "test_skip")
3474
def test_exclude_pattern(self):
3475
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3476
self.assertNotContainsRe(out, "test_skip")
3478
def test_random_seed(self):
3479
self._run_selftest_with_suite(random_seed="now")
3481
def test_matching_tests_first(self):
3482
self._run_selftest_with_suite(matching_tests_first=True,
3483
pattern="test_self_ref$")
3485
def test_starting_with_and_exclude(self):
3486
out = self._run_selftest_with_suite(starting_with=["bt."],
3487
exclude_pattern="test_skip$")
3488
self.assertNotContainsRe(out, "test_skip")
3490
def test_additonal_decorator(self):
3491
out = self._run_selftest_with_suite(
3492
suite_decorators=[tests.TestDecorator])
3495
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3496
"""Check warnings from tests staying alive are emitted with subunit"""
3498
_test_needs_features = [features.subunit]
3500
def _run_selftest_with_suite(self, **kwargs):
3501
return TestUncollectedWarnings._run_selftest_with_suite(self,
3502
runner_class=tests.SubUnitBzrRunner, **kwargs)
3505
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3506
"""Check warnings from tests staying alive are emitted when forking"""
3509
class TestEnvironHandling(tests.TestCase):
3511
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3512
self.assertFalse('MYVAR' in os.environ)
3513
self.overrideEnv('MYVAR', '42')
3514
# We use an embedded test to make sure we fix the _captureVar bug
3515
class Test(tests.TestCase):
3517
# The first call save the 42 value
3518
self.overrideEnv('MYVAR', None)
3519
self.assertEqual(None, os.environ.get('MYVAR'))
3520
# Make sure we can call it twice
3521
self.overrideEnv('MYVAR', None)
3522
self.assertEqual(None, os.environ.get('MYVAR'))
3524
result = tests.TextTestResult(output, 0, 1)
3525
Test('test_me').run(result)
3526
if not result.wasStrictlySuccessful():
3527
self.fail(output.getvalue())
3528
# We get our value back
3529
self.assertEqual('42', os.environ.get('MYVAR'))
3532
class TestIsolatedEnv(tests.TestCase):
3533
"""Test isolating tests from os.environ.
3535
Since we use tests that are already isolated from os.environ a bit of care
3536
should be taken when designing the tests to avoid bootstrap side-effects.
3537
The tests start an already clean os.environ which allow doing valid
3538
assertions about which variables are present or not and design tests around
3542
class ScratchMonkey(tests.TestCase):
3547
def test_basics(self):
3548
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3549
# for tests.TestCase.
3550
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3551
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3552
# Being part of isolated_environ, BRZ_HOME should not appear here
3553
self.assertFalse('BRZ_HOME' in os.environ)
3554
# Make sure we know the definition of LINES: part of os.environ for
3556
self.assertTrue('LINES' in tests.isolated_environ)
3557
self.assertEqual('25', tests.isolated_environ['LINES'])
3558
self.assertEqual('25', os.environ['LINES'])
3560
def test_injecting_unknown_variable(self):
3561
# BRZ_HOME is known to be absent from os.environ
3562
test = self.ScratchMonkey('test_me')
3563
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3564
self.assertEqual('foo', os.environ['BRZ_HOME'])
3565
tests.restore_os_environ(test)
3566
self.assertFalse('BRZ_HOME' in os.environ)
3568
def test_injecting_known_variable(self):
3569
test = self.ScratchMonkey('test_me')
3570
# LINES is known to be present in os.environ
3571
tests.override_os_environ(test, {'LINES': '42'})
3572
self.assertEqual('42', os.environ['LINES'])
3573
tests.restore_os_environ(test)
3574
self.assertEqual('25', os.environ['LINES'])
3576
def test_deleting_variable(self):
3577
test = self.ScratchMonkey('test_me')
3578
# LINES is known to be present in os.environ
3579
tests.override_os_environ(test, {'LINES': None})
3580
self.assertTrue('LINES' not in os.environ)
3581
tests.restore_os_environ(test)
3582
self.assertEqual('25', os.environ['LINES'])
3585
class TestDocTestSuiteIsolation(tests.TestCase):
3586
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3588
Since tests.TestCase alreay provides an isolation from os.environ, we use
3589
the clean environment as a base for testing. To precisely capture the
3590
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3593
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3594
not `os.environ` so each test overrides it to suit its needs.
3598
def get_doctest_suite_for_string(self, klass, string):
3599
class Finder(doctest.DocTestFinder):
3601
def find(*args, **kwargs):
3602
test = doctest.DocTestParser().get_doctest(
3603
string, {}, 'foo', 'foo.py', 0)
3606
suite = klass(test_finder=Finder())
3609
def run_doctest_suite_for_string(self, klass, string):
3610
suite = self.get_doctest_suite_for_string(klass, string)
3612
result = tests.TextTestResult(output, 0, 1)
3614
return result, output
3616
def assertDocTestStringSucceds(self, klass, string):
3617
result, output = self.run_doctest_suite_for_string(klass, string)
3618
if not result.wasStrictlySuccessful():
3619
self.fail(output.getvalue())
3621
def assertDocTestStringFails(self, klass, string):
3622
result, output = self.run_doctest_suite_for_string(klass, string)
3623
if result.wasStrictlySuccessful():
3624
self.fail(output.getvalue())
3626
def test_injected_variable(self):
3627
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3630
>>> os.environ['LINES']
3633
# doctest.DocTestSuite fails as it sees '25'
3634
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3635
# tests.DocTestSuite sees '42'
3636
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3638
def test_deleted_variable(self):
3639
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3642
>>> os.environ.get('LINES')
3644
# doctest.DocTestSuite fails as it sees '25'
3645
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3646
# tests.DocTestSuite sees None
3647
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3650
class TestSelftestExcludePatterns(tests.TestCase):
3653
super(TestSelftestExcludePatterns, self).setUp()
3654
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3656
def suite_factory(self, keep_only=None, starting_with=None):
3657
"""A test suite factory with only a few tests."""
3658
class Test(tests.TestCase):
3660
# We don't need the full class path
3661
return self._testMethodName
3668
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3670
def assertTestList(self, expected, *selftest_args):
3671
# We rely on setUp installing the right test suite factory so we can
3672
# test at the command level without loading the whole test suite
3673
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3674
actual = out.splitlines()
3675
self.assertEqual(expected, actual)
3677
def test_full_list(self):
3678
self.assertTestList(['a', 'b', 'c'])
3680
def test_single_exclude(self):
3681
self.assertTestList(['b', 'c'], '-x', 'a')
3683
def test_mutiple_excludes(self):
3684
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3687
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3689
_test_needs_features = [features.subunit]
3692
super(TestCounterHooks, self).setUp()
3693
class Test(tests.TestCase):
3696
super(Test, self).setUp()
3697
self.hooks = hooks.Hooks()
3698
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3699
self.install_counter_hook(self.hooks, 'myhook')
3704
def run_hook_once(self):
3705
for hook in self.hooks['myhook']:
3708
self.test_class = Test
3710
def assertHookCalls(self, expected_calls, test_name):
3711
test = self.test_class(test_name)
3712
result = unittest.TestResult()
3714
self.assertTrue(hasattr(test, '_counters'))
3715
self.assertTrue('myhook' in test._counters)
3716
self.assertEqual(expected_calls, test._counters['myhook'])
3718
def test_no_hook(self):
3719
self.assertHookCalls(0, 'no_hook')
3721
def test_run_hook_once(self):
3722
tt = features.testtools
3723
if tt.module.__version__ < (0, 9, 8):
3724
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3725
self.assertHookCalls(1, 'run_hook_once')