603
639
def test_dangling_locks_cause_failures(self):
604
640
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
641
def test_function(self):
606
t = self.get_transport('.')
642
t = self.get_transport_from_path('.')
607
643
l = lockdir.LockDir(t, 'lock')
610
646
test = TestDanglingLock('test_function')
611
647
result = test.run()
648
total_failures = result.errors + result.failures
612
649
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
650
self.assertEqual(1, len(total_failures))
615
652
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
654
self.assertEqual(0, len(total_failures))
620
657
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
658
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
660
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
661
from ..transport.readonly import ReadonlyTransportDecorator
626
662
self.vfs_transport_factory = memory.MemoryServer
627
663
self.transport_readonly_server = None
628
664
# calling get_readonly_transport() constructs a decorator on the url
630
666
url = self.get_readonly_url()
631
667
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
668
t = transport.get_transport_from_url(url)
669
t2 = transport.get_transport_from_url(url2)
670
self.assertIsInstance(t, ReadonlyTransportDecorator)
671
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
672
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
674
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
675
from .http_server import HttpServer
676
from ..transport.http import HttpTransportBase
642
677
self.transport_server = test_server.LocalURLServer
643
678
self.transport_readonly_server = HttpServer
644
679
# calling get_readonly_transport() gives us a HTTP server instance.
645
680
url = self.get_readonly_url()
646
681
url2 = self.get_readonly_url('foo/bar')
647
682
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
683
t = transport.get_transport_from_url(url)
684
t2 = transport.get_transport_from_url(url2)
685
self.assertIsInstance(t, HttpTransportBase)
686
self.assertIsInstance(t2, HttpTransportBase)
652
687
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
689
def test_is_directory(self):
1643
1667
class Test(tests.TestCase):
1645
1669
def setUp(self):
1646
tests.TestCase.setUp(self)
1670
super(Test, self).setUp()
1647
1671
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1673
def test_value(self):
1650
1674
self.assertEqual('original', self.orig)
1651
1675
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1677
self._run_successful_test(Test('test_value'))
1655
1678
self.assertEqual('original', obj.test_attr)
1680
def test_overrideAttr_with_no_existing_value_and_value(self):
1681
# Do not define the test_attribute
1682
obj = self # Make 'obj' visible to the embedded test
1683
class Test(tests.TestCase):
1686
tests.TestCase.setUp(self)
1687
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1689
def test_value(self):
1690
self.assertEqual(tests._unitialized_attr, self.orig)
1691
self.assertEqual('modified', obj.test_attr)
1693
self._run_successful_test(Test('test_value'))
1694
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1696
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1697
# Do not define the test_attribute
1698
obj = self # Make 'obj' visible to the embedded test
1699
class Test(tests.TestCase):
1702
tests.TestCase.setUp(self)
1703
self.orig = self.overrideAttr(obj, 'test_attr')
1705
def test_value(self):
1706
self.assertEqual(tests._unitialized_attr, self.orig)
1707
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1709
self._run_successful_test(Test('test_value'))
1710
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1712
def test_recordCalls(self):
1713
from breezy.tests import test_selftest
1714
calls = self.recordCalls(
1715
test_selftest, '_add_numbers')
1716
self.assertEqual(test_selftest._add_numbers(2, 10),
1718
self.assertEqual(calls, [((2, 10), {})])
1721
def _add_numbers(a, b):
1725
class _MissingFeature(features.Feature):
1728
missing_feature = _MissingFeature()
1731
def _get_test(name):
1732
"""Get an instance of a specific example test.
1734
We protect this in a function so that they don't auto-run in the test
1738
class ExampleTests(tests.TestCase):
1740
def test_fail(self):
1741
mutter('this was a failing test')
1742
self.fail('this test will fail')
1744
def test_error(self):
1745
mutter('this test errored')
1746
raise RuntimeError('gotcha')
1748
def test_missing_feature(self):
1749
mutter('missing the feature')
1750
self.requireFeature(missing_feature)
1752
def test_skip(self):
1753
mutter('this test will be skipped')
1754
raise tests.TestSkipped('reason')
1756
def test_success(self):
1757
mutter('this test succeeds')
1759
def test_xfail(self):
1760
mutter('test with expected failure')
1761
self.knownFailure('this_fails')
1763
def test_unexpected_success(self):
1764
mutter('test with unexpected success')
1765
self.expectFailure('should_fail', lambda: None)
1767
return ExampleTests(name)
1770
def _get_skip_reasons(result):
1771
# GZ 2017-06-06: Newer testtools doesn't have this, uses detail instead
1772
return result.skip_reasons
1775
class TestTestCaseLogDetails(tests.TestCase):
1777
def _run_test(self, test_name):
1778
test = _get_test(test_name)
1779
result = testtools.TestResult()
1783
def test_fail_has_log(self):
1784
result = self._run_test('test_fail')
1785
self.assertEqual(1, len(result.failures))
1786
result_content = result.failures[0][1]
1787
self.assertContainsRe(result_content,
1788
'(?m)^(?:Text attachment: )?log(?:$|: )')
1789
self.assertContainsRe(result_content, 'this was a failing test')
1791
def test_error_has_log(self):
1792
result = self._run_test('test_error')
1793
self.assertEqual(1, len(result.errors))
1794
result_content = result.errors[0][1]
1795
self.assertContainsRe(result_content,
1796
'(?m)^(?:Text attachment: )?log(?:$|: )')
1797
self.assertContainsRe(result_content, 'this test errored')
1799
def test_skip_has_no_log(self):
1800
result = self._run_test('test_skip')
1801
reasons = _get_skip_reasons(result)
1802
self.assertEqual({'reason'}, set(reasons))
1803
skips = reasons['reason']
1804
self.assertEqual(1, len(skips))
1806
self.assertFalse('log' in test.getDetails())
1808
def test_missing_feature_has_no_log(self):
1809
# testtools doesn't know about addNotSupported, so it just gets
1810
# considered as a skip
1811
result = self._run_test('test_missing_feature')
1812
reasons = _get_skip_reasons(result)
1813
self.assertEqual({missing_feature}, set(reasons))
1814
skips = reasons[missing_feature]
1815
self.assertEqual(1, len(skips))
1817
self.assertFalse('log' in test.getDetails())
1819
def test_xfail_has_no_log(self):
1820
result = self._run_test('test_xfail')
1821
self.assertEqual(1, len(result.expectedFailures))
1822
result_content = result.expectedFailures[0][1]
1823
self.assertNotContainsRe(result_content,
1824
'(?m)^(?:Text attachment: )?log(?:$|: )')
1825
self.assertNotContainsRe(result_content, 'test with expected failure')
1827
def test_unexpected_success_has_log(self):
1828
result = self._run_test('test_unexpected_success')
1829
self.assertEqual(1, len(result.unexpectedSuccesses))
1830
# Inconsistency, unexpectedSuccesses is a list of tests,
1831
# expectedFailures is a list of reasons?
1832
test = result.unexpectedSuccesses[0]
1833
details = test.getDetails()
1834
self.assertTrue('log' in details)
1837
class TestTestCloning(tests.TestCase):
1838
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1840
def test_cloned_testcase_does_not_share_details(self):
1841
"""A TestCase cloned with clone_test does not share mutable attributes
1842
such as details or cleanups.
1844
class Test(tests.TestCase):
1846
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1847
orig_test = Test('test_foo')
1848
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1849
orig_test.run(unittest.TestResult())
1850
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1851
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1853
def test_double_apply_scenario_preserves_first_scenario(self):
1854
"""Applying two levels of scenarios to a test preserves the attributes
1855
added by both scenarios.
1857
class Test(tests.TestCase):
1860
test = Test('test_foo')
1861
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1862
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1863
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1864
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1865
all_tests = list(tests.iter_suite_tests(suite))
1866
self.assertLength(4, all_tests)
1867
all_xys = sorted((t.x, t.y) for t in all_tests)
1868
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1871
# NB: Don't delete this; it's not actually from 0.11!
1659
1872
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2181
load_list='missing file name', list_only=True)
2184
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2186
_test_needs_features = [features.subunit]
2188
def run_subunit_stream(self, test_name):
2189
from subunit import ProtocolTestCase
2191
return TestUtil.TestSuite([_get_test(test_name)])
2192
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2193
test_suite_factory=factory)
2194
test = ProtocolTestCase(stream)
2195
result = testtools.TestResult()
2197
content = stream.getvalue()
2198
return content, result
2200
def test_fail_has_log(self):
2201
content, result = self.run_subunit_stream('test_fail')
2202
self.assertEqual(1, len(result.failures))
2203
self.assertContainsRe(content, '(?m)^log$')
2204
self.assertContainsRe(content, 'this test will fail')
2206
def test_error_has_log(self):
2207
content, result = self.run_subunit_stream('test_error')
2208
self.assertContainsRe(content, '(?m)^log$')
2209
self.assertContainsRe(content, 'this test errored')
2211
def test_skip_has_no_log(self):
2212
content, result = self.run_subunit_stream('test_skip')
2213
self.assertNotContainsRe(content, '(?m)^log$')
2214
self.assertNotContainsRe(content, 'this test will be skipped')
2215
reasons = _get_skip_reasons(result)
2216
self.assertEqual({'reason'}, set(reasons))
2217
skips = reasons['reason']
2218
self.assertEqual(1, len(skips))
2220
# RemotedTestCase doesn't preserve the "details"
2221
## self.assertFalse('log' in test.getDetails())
2223
def test_missing_feature_has_no_log(self):
2224
content, result = self.run_subunit_stream('test_missing_feature')
2225
self.assertNotContainsRe(content, '(?m)^log$')
2226
self.assertNotContainsRe(content, 'missing the feature')
2227
reasons = _get_skip_reasons(result)
2228
self.assertEqual({'_MissingFeature\n'}, set(reasons))
2229
skips = reasons['_MissingFeature\n']
2230
self.assertEqual(1, len(skips))
2232
# RemotedTestCase doesn't preserve the "details"
2233
## self.assertFalse('log' in test.getDetails())
2235
def test_xfail_has_no_log(self):
2236
content, result = self.run_subunit_stream('test_xfail')
2237
self.assertNotContainsRe(content, '(?m)^log$')
2238
self.assertNotContainsRe(content, 'test with expected failure')
2239
self.assertEqual(1, len(result.expectedFailures))
2240
result_content = result.expectedFailures[0][1]
2241
self.assertNotContainsRe(result_content,
2242
'(?m)^(?:Text attachment: )?log(?:$|: )')
2243
self.assertNotContainsRe(result_content, 'test with expected failure')
2245
def test_unexpected_success_has_log(self):
2246
content, result = self.run_subunit_stream('test_unexpected_success')
2247
self.assertContainsRe(content, '(?m)^log$')
2248
self.assertContainsRe(content, 'test with unexpected success')
2249
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2250
# success, if a min version check is added remove this
2251
from subunit import TestProtocolClient as _Client
2252
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2253
self.expectFailure('subunit treats "unexpectedSuccess"'
2254
' as a plain success',
2255
self.assertEqual, 1, len(result.unexpectedSuccesses))
2256
self.assertEqual(1, len(result.unexpectedSuccesses))
2257
test = result.unexpectedSuccesses[0]
2258
# RemotedTestCase doesn't preserve the "details"
2259
## self.assertTrue('log' in test.getDetails())
2261
def test_success_has_no_log(self):
2262
content, result = self.run_subunit_stream('test_success')
2263
self.assertEqual(1, result.testsRun)
2264
self.assertNotContainsRe(content, '(?m)^log$')
2265
self.assertNotContainsRe(content, 'this test succeeds')
1974
2268
class TestRunBzr(tests.TestCase):
2264
2558
class TestStartBzrSubProcess(tests.TestCase):
2559
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2561
def _subprocess_log_cleanup(self):
2562
"""Inhibits the base version as we don't produce a log file."""
2269
2564
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2565
"""Override the base version to record the command that is run.
2567
From there we can ensure it is correct without spawning a real process.
2271
2569
self.check_popen_state()
2272
2570
self._popen_args = args
2273
2571
self._popen_kwargs = kwargs
2274
2572
raise _DontSpawnProcess()
2574
def check_popen_state(self):
2575
"""Replace to make assertions when popen is called."""
2276
2577
def test_run_bzr_subprocess_no_plugins(self):
2277
2578
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2579
command = self._popen_args[0]
2279
2580
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2581
self.assertEqual(self.get_brz_path(), command[1])
2281
2582
self.assertEqual(['--no-plugins'], command[2:])
2283
2584
def test_allow_plugins(self):
2284
2585
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2587
command = self._popen_args[0]
2287
2588
self.assertEqual([], command[2:])
2289
2590
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2591
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2592
# set in the child
2292
2593
def check_environment():
2293
2594
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2595
self.check_popen_state = check_environment
2295
2596
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2597
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2598
# not set in theparent
2298
2599
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2601
def test_run_bzr_subprocess_env_del(self):
2301
2602
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2603
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2604
def check_environment():
2304
2605
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2606
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2607
self.check_popen_state = check_environment
2307
2608
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2609
env_changes={'EXISTANT_ENV_VAR':None})
2309
2610
# Still set in parent
2310
2611
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2612
del os.environ['EXISTANT_ENV_VAR']
2313
2614
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2615
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2616
def check_environment():
2316
2617
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2618
self.check_popen_state = check_environment
2318
2619
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2620
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2622
def test_working_dir(self):
2322
2623
"""Test that we can specify the working dir for the child"""
2354
2654
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
2656
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2657
self.assertEqual('brz: interrupted\n', result[1])
2434
2660
class TestSelftestFiltering(tests.TestCase):
2436
2662
def setUp(self):
2437
tests.TestCase.setUp(self)
2663
super(TestSelftestFiltering, self).setUp()
2438
2664
self.suite = TestUtil.TestSuite()
2439
2665
self.loader = TestUtil.TestLoader()
2440
2666
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2667
sys.modules['breezy.tests.test_selftest']))
2442
2668
self.all_names = _test_ids(self.suite)
2444
2670
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2671
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2672
'test_condition_id_re')
2447
2673
filtered_suite = tests.filter_suite_by_condition(
2448
2674
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2675
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2677
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2678
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2679
'test_condition_id_in_list']
2454
2680
id_list = tests.TestIdList(test_names)
2455
2681
filtered_suite = tests.filter_suite_by_condition(
2953
3181
def test_predefined_prefixes(self):
2954
3182
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3183
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3184
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3185
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3186
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3187
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3188
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3191
class TestThreadLeakDetection(tests.TestCase):
3192
"""Ensure when tests leak threads we detect and report it"""
3194
class LeakRecordingResult(tests.ExtendedTestResult):
3196
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3198
def _report_thread_leak(self, test, leaks, alive):
3199
self.leaks.append((test, leaks))
3201
def test_testcase_without_addCleanups(self):
3202
"""Check old TestCase instances don't break with leak detection"""
3203
class Test(unittest.TestCase):
3206
result = self.LeakRecordingResult()
3208
result.startTestRun()
3210
result.stopTestRun()
3211
self.assertEqual(result._tests_leaking_threads_count, 0)
3212
self.assertEqual(result.leaks, [])
3214
def test_thread_leak(self):
3215
"""Ensure a thread that outlives the running of a test is reported
3217
Uses a thread that blocks on an event, and is started by the inner
3218
test case. As the thread outlives the inner case's run, it should be
3219
detected as a leak, but the event is then set so that the thread can
3220
be safely joined in cleanup so it's not leaked for real.
3222
event = threading.Event()
3223
thread = threading.Thread(name="Leaker", target=event.wait)
3224
class Test(tests.TestCase):
3225
def test_leak(self):
3227
result = self.LeakRecordingResult()
3228
test = Test("test_leak")
3229
self.addCleanup(thread.join)
3230
self.addCleanup(event.set)
3231
result.startTestRun()
3233
result.stopTestRun()
3234
self.assertEqual(result._tests_leaking_threads_count, 1)
3235
self.assertEqual(result._first_thread_leaker_id, test.id())
3236
self.assertEqual(result.leaks, [(test, {thread})])
3237
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3239
def test_multiple_leaks(self):
3240
"""Check multiple leaks are blamed on the test cases at fault
3242
Same concept as the previous test, but has one inner test method that
3243
leaks two threads, and one that doesn't leak at all.
3245
event = threading.Event()
3246
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3247
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3248
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3249
class Test(tests.TestCase):
3250
def test_first_leak(self):
3252
def test_second_no_leak(self):
3254
def test_third_leak(self):
3257
result = self.LeakRecordingResult()
3258
first_test = Test("test_first_leak")
3259
third_test = Test("test_third_leak")
3260
self.addCleanup(thread_a.join)
3261
self.addCleanup(thread_b.join)
3262
self.addCleanup(thread_c.join)
3263
self.addCleanup(event.set)
3264
result.startTestRun()
3266
[first_test, Test("test_second_no_leak"), third_test]
3268
result.stopTestRun()
3269
self.assertEqual(result._tests_leaking_threads_count, 2)
3270
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3271
self.assertEqual(result.leaks, [
3272
(first_test, {thread_b}),
3273
(third_test, {thread_a, thread_c})])
3274
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3277
class TestPostMortemDebugging(tests.TestCase):
3278
"""Check post mortem debugging works when tests fail or error"""
3280
class TracebackRecordingResult(tests.ExtendedTestResult):
3282
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3283
self.postcode = None
3284
def _post_mortem(self, tb=None):
3285
"""Record the code object at the end of the current traceback"""
3286
tb = tb or sys.exc_info()[2]
3289
while next is not None:
3292
self.postcode = tb.tb_frame.f_code
3293
def report_error(self, test, err):
3295
def report_failure(self, test, err):
3298
def test_location_unittest_error(self):
3299
"""Needs right post mortem traceback with erroring unittest case"""
3300
class Test(unittest.TestCase):
3303
result = self.TracebackRecordingResult()
3305
self.assertEqual(result.postcode, Test.runTest.__code__)
3307
def test_location_unittest_failure(self):
3308
"""Needs right post mortem traceback with failing unittest case"""
3309
class Test(unittest.TestCase):
3311
raise self.failureException
3312
result = self.TracebackRecordingResult()
3314
self.assertEqual(result.postcode, Test.runTest.__code__)
3316
def test_location_bt_error(self):
3317
"""Needs right post mortem traceback with erroring breezy.tests case"""
3318
class Test(tests.TestCase):
3319
def test_error(self):
3321
result = self.TracebackRecordingResult()
3322
Test("test_error").run(result)
3323
self.assertEqual(result.postcode, Test.test_error.__code__)
3325
def test_location_bt_failure(self):
3326
"""Needs right post mortem traceback with failing breezy.tests case"""
3327
class Test(tests.TestCase):
3328
def test_failure(self):
3329
raise self.failureException
3330
result = self.TracebackRecordingResult()
3331
Test("test_failure").run(result)
3332
self.assertEqual(result.postcode, Test.test_failure.__code__)
3334
def test_env_var_triggers_post_mortem(self):
3335
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3337
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3338
post_mortem_calls = []
3339
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3340
self.overrideEnv('BRZ_TEST_PDB', None)
3341
result._post_mortem(1)
3342
self.overrideEnv('BRZ_TEST_PDB', 'on')
3343
result._post_mortem(2)
3344
self.assertEqual([2], post_mortem_calls)
2963
3347
class TestRunSuite(tests.TestCase):
2976
3360
self.verbosity)
2977
3361
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3362
self.assertLength(1, calls)
3365
class _Selftest(object):
3366
"""Mixin for tests needing full selftest output"""
3368
def _inject_stream_into_subunit(self, stream):
3369
"""To be overridden by subclasses that run tests out of process"""
3371
def _run_selftest(self, **kwargs):
3373
self._inject_stream_into_subunit(sio)
3374
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3375
return sio.getvalue()
3378
class _ForkedSelftest(_Selftest):
3379
"""Mixin for tests needing full selftest output with forked children"""
3381
_test_needs_features = [features.subunit]
3383
def _inject_stream_into_subunit(self, stream):
3384
"""Monkey-patch subunit so the extra output goes to stream not stdout
3386
Some APIs need rewriting so this kind of bogus hackery can be replaced
3387
by passing the stream param from run_tests down into ProtocolTestCase.
3389
from subunit import ProtocolTestCase
3390
_original_init = ProtocolTestCase.__init__
3391
def _init_with_passthrough(self, *args, **kwargs):
3392
_original_init(self, *args, **kwargs)
3393
self._passthrough = stream
3394
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3396
def _run_selftest(self, **kwargs):
3397
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3398
if getattr(os, "fork", None) is None:
3399
raise tests.TestNotApplicable("Platform doesn't support forking")
3400
# Make sure the fork code is actually invoked by claiming two cores
3401
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3402
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3403
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3406
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3407
"""Check operation of --parallel=fork selftest option"""
3409
def test_error_in_child_during_fork(self):
3410
"""Error in a forked child during test setup should get reported"""
3411
class Test(tests.TestCase):
3412
def testMethod(self):
3414
# We don't care what, just break something that a child will run
3415
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3416
out = self._run_selftest(test_suite_factory=Test)
3417
# Lines from the tracebacks of the two child processes may be mixed
3418
# together due to the way subunit parses and forwards the streams,
3419
# so permit extra lines between each part of the error output.
3420
self.assertContainsRe(out,
3423
".+ in fork_for_tests\n"
3425
"\s*workaround_zealous_crypto_random\(\)\n"
3430
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3431
"""Check a test case still alive after being run emits a warning"""
3433
class Test(tests.TestCase):
3434
def test_pass(self):
3436
def test_self_ref(self):
3437
self.also_self = self.test_self_ref
3438
def test_skip(self):
3439
self.skipTest("Don't need")
3441
def _get_suite(self):
3442
return TestUtil.TestSuite([
3443
self.Test("test_pass"),
3444
self.Test("test_self_ref"),
3445
self.Test("test_skip"),
3448
def _run_selftest_with_suite(self, **kwargs):
3449
old_flags = tests.selftest_debug_flags
3450
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3451
gc_on = gc.isenabled()
3455
output = self._run_selftest(test_suite_factory=self._get_suite,
3460
tests.selftest_debug_flags = old_flags
3461
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3462
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3465
def test_testsuite(self):
3466
self._run_selftest_with_suite()
3468
def test_pattern(self):
3469
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3470
self.assertNotContainsRe(out, "test_skip")
3472
def test_exclude_pattern(self):
3473
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3474
self.assertNotContainsRe(out, "test_skip")
3476
def test_random_seed(self):
3477
self._run_selftest_with_suite(random_seed="now")
3479
def test_matching_tests_first(self):
3480
self._run_selftest_with_suite(matching_tests_first=True,
3481
pattern="test_self_ref$")
3483
def test_starting_with_and_exclude(self):
3484
out = self._run_selftest_with_suite(starting_with=["bt."],
3485
exclude_pattern="test_skip$")
3486
self.assertNotContainsRe(out, "test_skip")
3488
def test_additonal_decorator(self):
3489
out = self._run_selftest_with_suite(
3490
suite_decorators=[tests.TestDecorator])
3493
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3494
"""Check warnings from tests staying alive are emitted with subunit"""
3496
_test_needs_features = [features.subunit]
3498
def _run_selftest_with_suite(self, **kwargs):
3499
return TestUncollectedWarnings._run_selftest_with_suite(self,
3500
runner_class=tests.SubUnitBzrRunner, **kwargs)
3503
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3504
"""Check warnings from tests staying alive are emitted when forking"""
3507
class TestEnvironHandling(tests.TestCase):
3509
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3510
self.assertFalse('MYVAR' in os.environ)
3511
self.overrideEnv('MYVAR', '42')
3512
# We use an embedded test to make sure we fix the _captureVar bug
3513
class Test(tests.TestCase):
3515
# The first call save the 42 value
3516
self.overrideEnv('MYVAR', None)
3517
self.assertEqual(None, os.environ.get('MYVAR'))
3518
# Make sure we can call it twice
3519
self.overrideEnv('MYVAR', None)
3520
self.assertEqual(None, os.environ.get('MYVAR'))
3522
result = tests.TextTestResult(output, 0, 1)
3523
Test('test_me').run(result)
3524
if not result.wasStrictlySuccessful():
3525
self.fail(output.getvalue())
3526
# We get our value back
3527
self.assertEqual('42', os.environ.get('MYVAR'))
3530
class TestIsolatedEnv(tests.TestCase):
3531
"""Test isolating tests from os.environ.
3533
Since we use tests that are already isolated from os.environ a bit of care
3534
should be taken when designing the tests to avoid bootstrap side-effects.
3535
The tests start an already clean os.environ which allow doing valid
3536
assertions about which variables are present or not and design tests around
3540
class ScratchMonkey(tests.TestCase):
3545
def test_basics(self):
3546
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3547
# for tests.TestCase.
3548
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3549
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3550
# Being part of isolated_environ, BRZ_HOME should not appear here
3551
self.assertFalse('BRZ_HOME' in os.environ)
3552
# Make sure we know the definition of LINES: part of os.environ for
3554
self.assertTrue('LINES' in tests.isolated_environ)
3555
self.assertEqual('25', tests.isolated_environ['LINES'])
3556
self.assertEqual('25', os.environ['LINES'])
3558
def test_injecting_unknown_variable(self):
3559
# BRZ_HOME is known to be absent from os.environ
3560
test = self.ScratchMonkey('test_me')
3561
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3562
self.assertEqual('foo', os.environ['BRZ_HOME'])
3563
tests.restore_os_environ(test)
3564
self.assertFalse('BRZ_HOME' in os.environ)
3566
def test_injecting_known_variable(self):
3567
test = self.ScratchMonkey('test_me')
3568
# LINES is known to be present in os.environ
3569
tests.override_os_environ(test, {'LINES': '42'})
3570
self.assertEqual('42', os.environ['LINES'])
3571
tests.restore_os_environ(test)
3572
self.assertEqual('25', os.environ['LINES'])
3574
def test_deleting_variable(self):
3575
test = self.ScratchMonkey('test_me')
3576
# LINES is known to be present in os.environ
3577
tests.override_os_environ(test, {'LINES': None})
3578
self.assertTrue('LINES' not in os.environ)
3579
tests.restore_os_environ(test)
3580
self.assertEqual('25', os.environ['LINES'])
3583
class TestDocTestSuiteIsolation(tests.TestCase):
3584
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3586
Since tests.TestCase alreay provides an isolation from os.environ, we use
3587
the clean environment as a base for testing. To precisely capture the
3588
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3591
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3592
not `os.environ` so each test overrides it to suit its needs.
3596
def get_doctest_suite_for_string(self, klass, string):
3597
class Finder(doctest.DocTestFinder):
3599
def find(*args, **kwargs):
3600
test = doctest.DocTestParser().get_doctest(
3601
string, {}, 'foo', 'foo.py', 0)
3604
suite = klass(test_finder=Finder())
3607
def run_doctest_suite_for_string(self, klass, string):
3608
suite = self.get_doctest_suite_for_string(klass, string)
3610
result = tests.TextTestResult(output, 0, 1)
3612
return result, output
3614
def assertDocTestStringSucceds(self, klass, string):
3615
result, output = self.run_doctest_suite_for_string(klass, string)
3616
if not result.wasStrictlySuccessful():
3617
self.fail(output.getvalue())
3619
def assertDocTestStringFails(self, klass, string):
3620
result, output = self.run_doctest_suite_for_string(klass, string)
3621
if result.wasStrictlySuccessful():
3622
self.fail(output.getvalue())
3624
def test_injected_variable(self):
3625
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3628
>>> os.environ['LINES']
3631
# doctest.DocTestSuite fails as it sees '25'
3632
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3633
# tests.DocTestSuite sees '42'
3634
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3636
def test_deleted_variable(self):
3637
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3640
>>> os.environ.get('LINES')
3642
# doctest.DocTestSuite fails as it sees '25'
3643
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3644
# tests.DocTestSuite sees None
3645
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3648
class TestSelftestExcludePatterns(tests.TestCase):
3651
super(TestSelftestExcludePatterns, self).setUp()
3652
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3654
def suite_factory(self, keep_only=None, starting_with=None):
3655
"""A test suite factory with only a few tests."""
3656
class Test(tests.TestCase):
3658
# We don't need the full class path
3659
return self._testMethodName
3666
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3668
def assertTestList(self, expected, *selftest_args):
3669
# We rely on setUp installing the right test suite factory so we can
3670
# test at the command level without loading the whole test suite
3671
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3672
actual = out.splitlines()
3673
self.assertEqual(expected, actual)
3675
def test_full_list(self):
3676
self.assertTestList(['a', 'b', 'c'])
3678
def test_single_exclude(self):
3679
self.assertTestList(['b', 'c'], '-x', 'a')
3681
def test_mutiple_excludes(self):
3682
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3685
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3687
_test_needs_features = [features.subunit]
3690
super(TestCounterHooks, self).setUp()
3691
class Test(tests.TestCase):
3694
super(Test, self).setUp()
3695
self.hooks = hooks.Hooks()
3696
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3697
self.install_counter_hook(self.hooks, 'myhook')
3702
def run_hook_once(self):
3703
for hook in self.hooks['myhook']:
3706
self.test_class = Test
3708
def assertHookCalls(self, expected_calls, test_name):
3709
test = self.test_class(test_name)
3710
result = unittest.TestResult()
3712
self.assertTrue(hasattr(test, '_counters'))
3713
self.assertTrue('myhook' in test._counters)
3714
self.assertEqual(expected_calls, test._counters['myhook'])
3716
def test_no_hook(self):
3717
self.assertHookCalls(0, 'no_hook')
3719
def test_run_hook_once(self):
3720
tt = features.testtools
3721
if tt.module.__version__ < (0, 9, 8):
3722
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3723
self.assertHookCalls(1, 'run_hook_once')