603
639
def test_dangling_locks_cause_failures(self):
604
640
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
641
def test_function(self):
606
t = self.get_transport('.')
642
t = self.get_transport_from_path('.')
607
643
l = lockdir.LockDir(t, 'lock')
610
646
test = TestDanglingLock('test_function')
611
647
result = test.run()
648
total_failures = result.errors + result.failures
612
649
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
650
self.assertEqual(1, len(total_failures))
615
652
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
654
self.assertEqual(0, len(total_failures))
620
657
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
658
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
660
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
661
from ..transport.readonly import ReadonlyTransportDecorator
626
662
self.vfs_transport_factory = memory.MemoryServer
627
663
self.transport_readonly_server = None
628
664
# calling get_readonly_transport() constructs a decorator on the url
630
666
url = self.get_readonly_url()
631
667
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
668
t = transport.get_transport_from_url(url)
669
t2 = transport.get_transport_from_url(url2)
670
self.assertIsInstance(t, ReadonlyTransportDecorator)
671
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
672
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
674
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
675
from .http_server import HttpServer
676
from ..transport.http import HttpTransportBase
642
677
self.transport_server = test_server.LocalURLServer
643
678
self.transport_readonly_server = HttpServer
644
679
# calling get_readonly_transport() gives us a HTTP server instance.
645
680
url = self.get_readonly_url()
646
681
url2 = self.get_readonly_url('foo/bar')
647
682
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
683
t = transport.get_transport_from_url(url)
684
t2 = transport.get_transport_from_url(url2)
685
self.assertIsInstance(t, HttpTransportBase)
686
self.assertIsInstance(t2, HttpTransportBase)
652
687
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
689
def test_is_directory(self):
1643
1673
class Test(tests.TestCase):
1645
1675
def setUp(self):
1646
tests.TestCase.setUp(self)
1676
super(Test, self).setUp()
1647
1677
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1679
def test_value(self):
1650
1680
self.assertEqual('original', self.orig)
1651
1681
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1683
self._run_successful_test(Test('test_value'))
1655
1684
self.assertEqual('original', obj.test_attr)
1686
def test_overrideAttr_with_no_existing_value_and_value(self):
1687
# Do not define the test_attribute
1688
obj = self # Make 'obj' visible to the embedded test
1689
class Test(tests.TestCase):
1692
tests.TestCase.setUp(self)
1693
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1695
def test_value(self):
1696
self.assertEqual(tests._unitialized_attr, self.orig)
1697
self.assertEqual('modified', obj.test_attr)
1699
self._run_successful_test(Test('test_value'))
1700
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1702
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1703
# Do not define the test_attribute
1704
obj = self # Make 'obj' visible to the embedded test
1705
class Test(tests.TestCase):
1708
tests.TestCase.setUp(self)
1709
self.orig = self.overrideAttr(obj, 'test_attr')
1711
def test_value(self):
1712
self.assertEqual(tests._unitialized_attr, self.orig)
1713
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1715
self._run_successful_test(Test('test_value'))
1716
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1718
def test_recordCalls(self):
1719
from breezy.tests import test_selftest
1720
calls = self.recordCalls(
1721
test_selftest, '_add_numbers')
1722
self.assertEqual(test_selftest._add_numbers(2, 10),
1724
self.assertEqual(calls, [((2, 10), {})])
1727
def _add_numbers(a, b):
1731
class _MissingFeature(features.Feature):
1734
missing_feature = _MissingFeature()
1737
def _get_test(name):
1738
"""Get an instance of a specific example test.
1740
We protect this in a function so that they don't auto-run in the test
1744
class ExampleTests(tests.TestCase):
1746
def test_fail(self):
1747
mutter('this was a failing test')
1748
self.fail('this test will fail')
1750
def test_error(self):
1751
mutter('this test errored')
1752
raise RuntimeError('gotcha')
1754
def test_missing_feature(self):
1755
mutter('missing the feature')
1756
self.requireFeature(missing_feature)
1758
def test_skip(self):
1759
mutter('this test will be skipped')
1760
raise tests.TestSkipped('reason')
1762
def test_success(self):
1763
mutter('this test succeeds')
1765
def test_xfail(self):
1766
mutter('test with expected failure')
1767
self.knownFailure('this_fails')
1769
def test_unexpected_success(self):
1770
mutter('test with unexpected success')
1771
self.expectFailure('should_fail', lambda: None)
1773
return ExampleTests(name)
1776
class TestTestCaseLogDetails(tests.TestCase):
1778
def _run_test(self, test_name):
1779
test = _get_test(test_name)
1780
result = testtools.TestResult()
1784
def test_fail_has_log(self):
1785
result = self._run_test('test_fail')
1786
self.assertEqual(1, len(result.failures))
1787
result_content = result.failures[0][1]
1788
self.assertContainsRe(result_content,
1789
'(?m)^(?:Text attachment: )?log(?:$|: )')
1790
self.assertContainsRe(result_content, 'this was a failing test')
1792
def test_error_has_log(self):
1793
result = self._run_test('test_error')
1794
self.assertEqual(1, len(result.errors))
1795
result_content = result.errors[0][1]
1796
self.assertContainsRe(result_content,
1797
'(?m)^(?:Text attachment: )?log(?:$|: )')
1798
self.assertContainsRe(result_content, 'this test errored')
1800
def test_skip_has_no_log(self):
1801
result = self._run_test('test_skip')
1802
self.assertEqual(['reason'], result.skip_reasons.keys())
1803
skips = result.skip_reasons['reason']
1804
self.assertEqual(1, len(skips))
1806
self.assertFalse('log' in test.getDetails())
1808
def test_missing_feature_has_no_log(self):
1809
# testtools doesn't know about addNotSupported, so it just gets
1810
# considered as a skip
1811
result = self._run_test('test_missing_feature')
1812
self.assertEqual([missing_feature], result.skip_reasons.keys())
1813
skips = result.skip_reasons[missing_feature]
1814
self.assertEqual(1, len(skips))
1816
self.assertFalse('log' in test.getDetails())
1818
def test_xfail_has_no_log(self):
1819
result = self._run_test('test_xfail')
1820
self.assertEqual(1, len(result.expectedFailures))
1821
result_content = result.expectedFailures[0][1]
1822
self.assertNotContainsRe(result_content,
1823
'(?m)^(?:Text attachment: )?log(?:$|: )')
1824
self.assertNotContainsRe(result_content, 'test with expected failure')
1826
def test_unexpected_success_has_log(self):
1827
result = self._run_test('test_unexpected_success')
1828
self.assertEqual(1, len(result.unexpectedSuccesses))
1829
# Inconsistency, unexpectedSuccesses is a list of tests,
1830
# expectedFailures is a list of reasons?
1831
test = result.unexpectedSuccesses[0]
1832
details = test.getDetails()
1833
self.assertTrue('log' in details)
1836
class TestTestCloning(tests.TestCase):
1837
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1839
def test_cloned_testcase_does_not_share_details(self):
1840
"""A TestCase cloned with clone_test does not share mutable attributes
1841
such as details or cleanups.
1843
class Test(tests.TestCase):
1845
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1846
orig_test = Test('test_foo')
1847
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1848
orig_test.run(unittest.TestResult())
1849
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1850
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1852
def test_double_apply_scenario_preserves_first_scenario(self):
1853
"""Applying two levels of scenarios to a test preserves the attributes
1854
added by both scenarios.
1856
class Test(tests.TestCase):
1859
test = Test('test_foo')
1860
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1861
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1862
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1863
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1864
all_tests = list(tests.iter_suite_tests(suite))
1865
self.assertLength(4, all_tests)
1866
all_xys = sorted((t.x, t.y) for t in all_tests)
1867
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1870
# NB: Don't delete this; it's not actually from 0.11!
1659
1871
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2180
load_list='missing file name', list_only=True)
2183
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2185
_test_needs_features = [features.subunit]
2187
def run_subunit_stream(self, test_name):
2188
from subunit import ProtocolTestCase
2190
return TestUtil.TestSuite([_get_test(test_name)])
2191
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2192
test_suite_factory=factory)
2193
test = ProtocolTestCase(stream)
2194
result = testtools.TestResult()
2196
content = stream.getvalue()
2197
return content, result
2199
def test_fail_has_log(self):
2200
content, result = self.run_subunit_stream('test_fail')
2201
self.assertEqual(1, len(result.failures))
2202
self.assertContainsRe(content, '(?m)^log$')
2203
self.assertContainsRe(content, 'this test will fail')
2205
def test_error_has_log(self):
2206
content, result = self.run_subunit_stream('test_error')
2207
self.assertContainsRe(content, '(?m)^log$')
2208
self.assertContainsRe(content, 'this test errored')
2210
def test_skip_has_no_log(self):
2211
content, result = self.run_subunit_stream('test_skip')
2212
self.assertNotContainsRe(content, '(?m)^log$')
2213
self.assertNotContainsRe(content, 'this test will be skipped')
2214
self.assertEqual(['reason'], result.skip_reasons.keys())
2215
skips = result.skip_reasons['reason']
2216
self.assertEqual(1, len(skips))
2218
# RemotedTestCase doesn't preserve the "details"
2219
## self.assertFalse('log' in test.getDetails())
2221
def test_missing_feature_has_no_log(self):
2222
content, result = self.run_subunit_stream('test_missing_feature')
2223
self.assertNotContainsRe(content, '(?m)^log$')
2224
self.assertNotContainsRe(content, 'missing the feature')
2225
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2226
skips = result.skip_reasons['_MissingFeature\n']
2227
self.assertEqual(1, len(skips))
2229
# RemotedTestCase doesn't preserve the "details"
2230
## self.assertFalse('log' in test.getDetails())
2232
def test_xfail_has_no_log(self):
2233
content, result = self.run_subunit_stream('test_xfail')
2234
self.assertNotContainsRe(content, '(?m)^log$')
2235
self.assertNotContainsRe(content, 'test with expected failure')
2236
self.assertEqual(1, len(result.expectedFailures))
2237
result_content = result.expectedFailures[0][1]
2238
self.assertNotContainsRe(result_content,
2239
'(?m)^(?:Text attachment: )?log(?:$|: )')
2240
self.assertNotContainsRe(result_content, 'test with expected failure')
2242
def test_unexpected_success_has_log(self):
2243
content, result = self.run_subunit_stream('test_unexpected_success')
2244
self.assertContainsRe(content, '(?m)^log$')
2245
self.assertContainsRe(content, 'test with unexpected success')
2246
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2247
# success, if a min version check is added remove this
2248
from subunit import TestProtocolClient as _Client
2249
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2250
self.expectFailure('subunit treats "unexpectedSuccess"'
2251
' as a plain success',
2252
self.assertEqual, 1, len(result.unexpectedSuccesses))
2253
self.assertEqual(1, len(result.unexpectedSuccesses))
2254
test = result.unexpectedSuccesses[0]
2255
# RemotedTestCase doesn't preserve the "details"
2256
## self.assertTrue('log' in test.getDetails())
2258
def test_success_has_no_log(self):
2259
content, result = self.run_subunit_stream('test_success')
2260
self.assertEqual(1, result.testsRun)
2261
self.assertNotContainsRe(content, '(?m)^log$')
2262
self.assertNotContainsRe(content, 'this test succeeds')
1974
2265
class TestRunBzr(tests.TestCase):
2264
2555
class TestStartBzrSubProcess(tests.TestCase):
2556
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2558
def _subprocess_log_cleanup(self):
2559
"""Inhibits the base version as we don't produce a log file."""
2269
2561
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2562
"""Override the base version to record the command that is run.
2564
From there we can ensure it is correct without spawning a real process.
2271
2566
self.check_popen_state()
2272
2567
self._popen_args = args
2273
2568
self._popen_kwargs = kwargs
2274
2569
raise _DontSpawnProcess()
2571
def check_popen_state(self):
2572
"""Replace to make assertions when popen is called."""
2276
2574
def test_run_bzr_subprocess_no_plugins(self):
2277
2575
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2576
command = self._popen_args[0]
2279
2577
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2578
self.assertEqual(self.get_brz_path(), command[1])
2281
2579
self.assertEqual(['--no-plugins'], command[2:])
2283
2581
def test_allow_plugins(self):
2284
2582
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2584
command = self._popen_args[0]
2287
2585
self.assertEqual([], command[2:])
2289
2587
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2588
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2589
# set in the child
2292
2590
def check_environment():
2293
2591
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2592
self.check_popen_state = check_environment
2295
2593
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2594
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2595
# not set in theparent
2298
2596
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2598
def test_run_bzr_subprocess_env_del(self):
2301
2599
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2600
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2601
def check_environment():
2304
2602
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2603
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2604
self.check_popen_state = check_environment
2307
2605
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2606
env_changes={'EXISTANT_ENV_VAR':None})
2309
2607
# Still set in parent
2310
2608
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2609
del os.environ['EXISTANT_ENV_VAR']
2313
2611
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2612
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2613
def check_environment():
2316
2614
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2615
self.check_popen_state = check_environment
2318
2616
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2617
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2619
def test_working_dir(self):
2322
2620
"""Test that we can specify the working dir for the child"""
2354
2651
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
2653
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2654
self.assertEqual('brz: interrupted\n', result[1])
2434
2657
class TestSelftestFiltering(tests.TestCase):
2436
2659
def setUp(self):
2437
tests.TestCase.setUp(self)
2660
super(TestSelftestFiltering, self).setUp()
2438
2661
self.suite = TestUtil.TestSuite()
2439
2662
self.loader = TestUtil.TestLoader()
2440
2663
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2664
sys.modules['breezy.tests.test_selftest']))
2442
2665
self.all_names = _test_ids(self.suite)
2444
2667
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2668
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2669
'test_condition_id_re')
2447
2670
filtered_suite = tests.filter_suite_by_condition(
2448
2671
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2672
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2674
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2675
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2676
'test_condition_id_in_list']
2454
2677
id_list = tests.TestIdList(test_names)
2455
2678
filtered_suite = tests.filter_suite_by_condition(
2953
3178
def test_predefined_prefixes(self):
2954
3179
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3180
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3181
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3182
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3183
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3184
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3185
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3188
class TestThreadLeakDetection(tests.TestCase):
3189
"""Ensure when tests leak threads we detect and report it"""
3191
class LeakRecordingResult(tests.ExtendedTestResult):
3193
tests.ExtendedTestResult.__init__(self, BytesIO(), 0, 1)
3195
def _report_thread_leak(self, test, leaks, alive):
3196
self.leaks.append((test, leaks))
3198
def test_testcase_without_addCleanups(self):
3199
"""Check old TestCase instances don't break with leak detection"""
3200
class Test(unittest.TestCase):
3203
result = self.LeakRecordingResult()
3205
result.startTestRun()
3207
result.stopTestRun()
3208
self.assertEqual(result._tests_leaking_threads_count, 0)
3209
self.assertEqual(result.leaks, [])
3211
def test_thread_leak(self):
3212
"""Ensure a thread that outlives the running of a test is reported
3214
Uses a thread that blocks on an event, and is started by the inner
3215
test case. As the thread outlives the inner case's run, it should be
3216
detected as a leak, but the event is then set so that the thread can
3217
be safely joined in cleanup so it's not leaked for real.
3219
event = threading.Event()
3220
thread = threading.Thread(name="Leaker", target=event.wait)
3221
class Test(tests.TestCase):
3222
def test_leak(self):
3224
result = self.LeakRecordingResult()
3225
test = Test("test_leak")
3226
self.addCleanup(thread.join)
3227
self.addCleanup(event.set)
3228
result.startTestRun()
3230
result.stopTestRun()
3231
self.assertEqual(result._tests_leaking_threads_count, 1)
3232
self.assertEqual(result._first_thread_leaker_id, test.id())
3233
self.assertEqual(result.leaks, [(test, {thread})])
3234
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3236
def test_multiple_leaks(self):
3237
"""Check multiple leaks are blamed on the test cases at fault
3239
Same concept as the previous test, but has one inner test method that
3240
leaks two threads, and one that doesn't leak at all.
3242
event = threading.Event()
3243
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3244
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3245
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3246
class Test(tests.TestCase):
3247
def test_first_leak(self):
3249
def test_second_no_leak(self):
3251
def test_third_leak(self):
3254
result = self.LeakRecordingResult()
3255
first_test = Test("test_first_leak")
3256
third_test = Test("test_third_leak")
3257
self.addCleanup(thread_a.join)
3258
self.addCleanup(thread_b.join)
3259
self.addCleanup(thread_c.join)
3260
self.addCleanup(event.set)
3261
result.startTestRun()
3263
[first_test, Test("test_second_no_leak"), third_test]
3265
result.stopTestRun()
3266
self.assertEqual(result._tests_leaking_threads_count, 2)
3267
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3268
self.assertEqual(result.leaks, [
3269
(first_test, {thread_b}),
3270
(third_test, {thread_a, thread_c})])
3271
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3274
class TestPostMortemDebugging(tests.TestCase):
3275
"""Check post mortem debugging works when tests fail or error"""
3277
class TracebackRecordingResult(tests.ExtendedTestResult):
3279
tests.ExtendedTestResult.__init__(self, BytesIO(), 0, 1)
3280
self.postcode = None
3281
def _post_mortem(self, tb=None):
3282
"""Record the code object at the end of the current traceback"""
3283
tb = tb or sys.exc_info()[2]
3286
while next is not None:
3289
self.postcode = tb.tb_frame.f_code
3290
def report_error(self, test, err):
3292
def report_failure(self, test, err):
3295
def test_location_unittest_error(self):
3296
"""Needs right post mortem traceback with erroring unittest case"""
3297
class Test(unittest.TestCase):
3300
result = self.TracebackRecordingResult()
3302
self.assertEqual(result.postcode, Test.runTest.__code__)
3304
def test_location_unittest_failure(self):
3305
"""Needs right post mortem traceback with failing unittest case"""
3306
class Test(unittest.TestCase):
3308
raise self.failureException
3309
result = self.TracebackRecordingResult()
3311
self.assertEqual(result.postcode, Test.runTest.__code__)
3313
def test_location_bt_error(self):
3314
"""Needs right post mortem traceback with erroring breezy.tests case"""
3315
class Test(tests.TestCase):
3316
def test_error(self):
3318
result = self.TracebackRecordingResult()
3319
Test("test_error").run(result)
3320
self.assertEqual(result.postcode, Test.test_error.__code__)
3322
def test_location_bt_failure(self):
3323
"""Needs right post mortem traceback with failing breezy.tests case"""
3324
class Test(tests.TestCase):
3325
def test_failure(self):
3326
raise self.failureException
3327
result = self.TracebackRecordingResult()
3328
Test("test_failure").run(result)
3329
self.assertEqual(result.postcode, Test.test_failure.__code__)
3331
def test_env_var_triggers_post_mortem(self):
3332
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3334
result = tests.ExtendedTestResult(BytesIO(), 0, 1)
3335
post_mortem_calls = []
3336
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3337
self.overrideEnv('BRZ_TEST_PDB', None)
3338
result._post_mortem(1)
3339
self.overrideEnv('BRZ_TEST_PDB', 'on')
3340
result._post_mortem(2)
3341
self.assertEqual([2], post_mortem_calls)
2963
3344
class TestRunSuite(tests.TestCase):
2974
3355
calls.append(test)
2975
3356
return tests.ExtendedTestResult(self.stream, self.descriptions,
2976
3357
self.verbosity)
2977
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3358
tests.run_suite(suite, runner_class=MyRunner, stream=BytesIO())
2978
3359
self.assertLength(1, calls)
3362
class _Selftest(object):
3363
"""Mixin for tests needing full selftest output"""
3365
def _inject_stream_into_subunit(self, stream):
3366
"""To be overridden by subclasses that run tests out of process"""
3368
def _run_selftest(self, **kwargs):
3370
self._inject_stream_into_subunit(sio)
3371
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3372
return sio.getvalue()
3375
class _ForkedSelftest(_Selftest):
3376
"""Mixin for tests needing full selftest output with forked children"""
3378
_test_needs_features = [features.subunit]
3380
def _inject_stream_into_subunit(self, stream):
3381
"""Monkey-patch subunit so the extra output goes to stream not stdout
3383
Some APIs need rewriting so this kind of bogus hackery can be replaced
3384
by passing the stream param from run_tests down into ProtocolTestCase.
3386
from subunit import ProtocolTestCase
3387
_original_init = ProtocolTestCase.__init__
3388
def _init_with_passthrough(self, *args, **kwargs):
3389
_original_init(self, *args, **kwargs)
3390
self._passthrough = stream
3391
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3393
def _run_selftest(self, **kwargs):
3394
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3395
if getattr(os, "fork", None) is None:
3396
raise tests.TestNotApplicable("Platform doesn't support forking")
3397
# Make sure the fork code is actually invoked by claiming two cores
3398
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3399
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3400
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3403
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3404
"""Check operation of --parallel=fork selftest option"""
3406
def test_error_in_child_during_fork(self):
3407
"""Error in a forked child during test setup should get reported"""
3408
class Test(tests.TestCase):
3409
def testMethod(self):
3411
# We don't care what, just break something that a child will run
3412
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3413
out = self._run_selftest(test_suite_factory=Test)
3414
# Lines from the tracebacks of the two child processes may be mixed
3415
# together due to the way subunit parses and forwards the streams,
3416
# so permit extra lines between each part of the error output.
3417
self.assertContainsRe(out,
3420
".+ in fork_for_tests\n"
3422
"\s*workaround_zealous_crypto_random\(\)\n"
3427
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3428
"""Check a test case still alive after being run emits a warning"""
3430
class Test(tests.TestCase):
3431
def test_pass(self):
3433
def test_self_ref(self):
3434
self.also_self = self.test_self_ref
3435
def test_skip(self):
3436
self.skipTest("Don't need")
3438
def _get_suite(self):
3439
return TestUtil.TestSuite([
3440
self.Test("test_pass"),
3441
self.Test("test_self_ref"),
3442
self.Test("test_skip"),
3445
def _run_selftest_with_suite(self, **kwargs):
3446
old_flags = tests.selftest_debug_flags
3447
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3448
gc_on = gc.isenabled()
3452
output = self._run_selftest(test_suite_factory=self._get_suite,
3457
tests.selftest_debug_flags = old_flags
3458
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3459
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3462
def test_testsuite(self):
3463
self._run_selftest_with_suite()
3465
def test_pattern(self):
3466
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3467
self.assertNotContainsRe(out, "test_skip")
3469
def test_exclude_pattern(self):
3470
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3471
self.assertNotContainsRe(out, "test_skip")
3473
def test_random_seed(self):
3474
self._run_selftest_with_suite(random_seed="now")
3476
def test_matching_tests_first(self):
3477
self._run_selftest_with_suite(matching_tests_first=True,
3478
pattern="test_self_ref$")
3480
def test_starting_with_and_exclude(self):
3481
out = self._run_selftest_with_suite(starting_with=["bt."],
3482
exclude_pattern="test_skip$")
3483
self.assertNotContainsRe(out, "test_skip")
3485
def test_additonal_decorator(self):
3486
out = self._run_selftest_with_suite(
3487
suite_decorators=[tests.TestDecorator])
3490
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3491
"""Check warnings from tests staying alive are emitted with subunit"""
3493
_test_needs_features = [features.subunit]
3495
def _run_selftest_with_suite(self, **kwargs):
3496
return TestUncollectedWarnings._run_selftest_with_suite(self,
3497
runner_class=tests.SubUnitBzrRunner, **kwargs)
3500
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted when forking"""
3504
class TestEnvironHandling(tests.TestCase):
3506
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3507
self.assertFalse('MYVAR' in os.environ)
3508
self.overrideEnv('MYVAR', '42')
3509
# We use an embedded test to make sure we fix the _captureVar bug
3510
class Test(tests.TestCase):
3512
# The first call save the 42 value
3513
self.overrideEnv('MYVAR', None)
3514
self.assertEqual(None, os.environ.get('MYVAR'))
3515
# Make sure we can call it twice
3516
self.overrideEnv('MYVAR', None)
3517
self.assertEqual(None, os.environ.get('MYVAR'))
3519
result = tests.TextTestResult(output, 0, 1)
3520
Test('test_me').run(result)
3521
if not result.wasStrictlySuccessful():
3522
self.fail(output.getvalue())
3523
# We get our value back
3524
self.assertEqual('42', os.environ.get('MYVAR'))
3527
class TestIsolatedEnv(tests.TestCase):
3528
"""Test isolating tests from os.environ.
3530
Since we use tests that are already isolated from os.environ a bit of care
3531
should be taken when designing the tests to avoid bootstrap side-effects.
3532
The tests start an already clean os.environ which allow doing valid
3533
assertions about which variables are present or not and design tests around
3537
class ScratchMonkey(tests.TestCase):
3542
def test_basics(self):
3543
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3544
# for tests.TestCase.
3545
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3546
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3547
# Being part of isolated_environ, BRZ_HOME should not appear here
3548
self.assertFalse('BRZ_HOME' in os.environ)
3549
# Make sure we know the definition of LINES: part of os.environ for
3551
self.assertTrue('LINES' in tests.isolated_environ)
3552
self.assertEqual('25', tests.isolated_environ['LINES'])
3553
self.assertEqual('25', os.environ['LINES'])
3555
def test_injecting_unknown_variable(self):
3556
# BRZ_HOME is known to be absent from os.environ
3557
test = self.ScratchMonkey('test_me')
3558
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3559
self.assertEqual('foo', os.environ['BRZ_HOME'])
3560
tests.restore_os_environ(test)
3561
self.assertFalse('BRZ_HOME' in os.environ)
3563
def test_injecting_known_variable(self):
3564
test = self.ScratchMonkey('test_me')
3565
# LINES is known to be present in os.environ
3566
tests.override_os_environ(test, {'LINES': '42'})
3567
self.assertEqual('42', os.environ['LINES'])
3568
tests.restore_os_environ(test)
3569
self.assertEqual('25', os.environ['LINES'])
3571
def test_deleting_variable(self):
3572
test = self.ScratchMonkey('test_me')
3573
# LINES is known to be present in os.environ
3574
tests.override_os_environ(test, {'LINES': None})
3575
self.assertTrue('LINES' not in os.environ)
3576
tests.restore_os_environ(test)
3577
self.assertEqual('25', os.environ['LINES'])
3580
class TestDocTestSuiteIsolation(tests.TestCase):
3581
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3583
Since tests.TestCase alreay provides an isolation from os.environ, we use
3584
the clean environment as a base for testing. To precisely capture the
3585
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3588
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3589
not `os.environ` so each test overrides it to suit its needs.
3593
def get_doctest_suite_for_string(self, klass, string):
3594
class Finder(doctest.DocTestFinder):
3596
def find(*args, **kwargs):
3597
test = doctest.DocTestParser().get_doctest(
3598
string, {}, 'foo', 'foo.py', 0)
3601
suite = klass(test_finder=Finder())
3604
def run_doctest_suite_for_string(self, klass, string):
3605
suite = self.get_doctest_suite_for_string(klass, string)
3607
result = tests.TextTestResult(output, 0, 1)
3609
return result, output
3611
def assertDocTestStringSucceds(self, klass, string):
3612
result, output = self.run_doctest_suite_for_string(klass, string)
3613
if not result.wasStrictlySuccessful():
3614
self.fail(output.getvalue())
3616
def assertDocTestStringFails(self, klass, string):
3617
result, output = self.run_doctest_suite_for_string(klass, string)
3618
if result.wasStrictlySuccessful():
3619
self.fail(output.getvalue())
3621
def test_injected_variable(self):
3622
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3625
>>> os.environ['LINES']
3628
# doctest.DocTestSuite fails as it sees '25'
3629
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3630
# tests.DocTestSuite sees '42'
3631
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3633
def test_deleted_variable(self):
3634
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3637
>>> os.environ.get('LINES')
3639
# doctest.DocTestSuite fails as it sees '25'
3640
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3641
# tests.DocTestSuite sees None
3642
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3645
class TestSelftestExcludePatterns(tests.TestCase):
3648
super(TestSelftestExcludePatterns, self).setUp()
3649
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3651
def suite_factory(self, keep_only=None, starting_with=None):
3652
"""A test suite factory with only a few tests."""
3653
class Test(tests.TestCase):
3655
# We don't need the full class path
3656
return self._testMethodName
3663
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3665
def assertTestList(self, expected, *selftest_args):
3666
# We rely on setUp installing the right test suite factory so we can
3667
# test at the command level without loading the whole test suite
3668
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3669
actual = out.splitlines()
3670
self.assertEqual(expected, actual)
3672
def test_full_list(self):
3673
self.assertTestList(['a', 'b', 'c'])
3675
def test_single_exclude(self):
3676
self.assertTestList(['b', 'c'], '-x', 'a')
3678
def test_mutiple_excludes(self):
3679
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3682
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3684
_test_needs_features = [features.subunit]
3687
super(TestCounterHooks, self).setUp()
3688
class Test(tests.TestCase):
3691
super(Test, self).setUp()
3692
self.hooks = hooks.Hooks()
3693
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3694
self.install_counter_hook(self.hooks, 'myhook')
3699
def run_hook_once(self):
3700
for hook in self.hooks['myhook']:
3703
self.test_class = Test
3705
def assertHookCalls(self, expected_calls, test_name):
3706
test = self.test_class(test_name)
3707
result = unittest.TestResult()
3709
self.assertTrue(hasattr(test, '_counters'))
3710
self.assertTrue('myhook' in test._counters)
3711
self.assertEqual(expected_calls, test._counters['myhook'])
3713
def test_no_hook(self):
3714
self.assertHookCalls(0, 'no_hook')
3716
def test_run_hook_once(self):
3717
tt = features.testtools
3718
if tt.module.__version__ < (0, 9, 8):
3719
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3720
self.assertHookCalls(1, 'run_hook_once')