603
652
def test_dangling_locks_cause_failures(self):
604
653
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
654
def test_function(self):
606
t = self.get_transport('.')
655
t = self.get_transport_from_path('.')
607
656
l = lockdir.LockDir(t, 'lock')
610
659
test = TestDanglingLock('test_function')
611
660
result = test.run()
661
total_failures = result.errors + result.failures
612
662
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
663
self.assertEqual(1, len(total_failures))
615
665
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
667
self.assertEqual(0, len(total_failures))
620
670
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
671
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
673
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
674
from ..transport.readonly import ReadonlyTransportDecorator
626
675
self.vfs_transport_factory = memory.MemoryServer
627
676
self.transport_readonly_server = None
628
677
# calling get_readonly_transport() constructs a decorator on the url
630
679
url = self.get_readonly_url()
631
680
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
681
t = transport.get_transport_from_url(url)
682
t2 = transport.get_transport_from_url(url2)
683
self.assertIsInstance(t, ReadonlyTransportDecorator)
684
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
685
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
687
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
688
from ..http_server import HttpServer
689
from ..transport.http import HttpTransportBase
642
690
self.transport_server = test_server.LocalURLServer
643
691
self.transport_readonly_server = HttpServer
644
692
# calling get_readonly_transport() gives us a HTTP server instance.
645
693
url = self.get_readonly_url()
646
694
url2 = self.get_readonly_url('foo/bar')
647
695
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
696
t = transport.get_transport_from_url(url)
697
t2 = transport.get_transport_from_url(url2)
698
self.assertIsInstance(t, HttpTransportBase)
699
self.assertIsInstance(t2, HttpTransportBase)
652
700
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
702
def test_is_directory(self):
1643
1686
class Test(tests.TestCase):
1645
1688
def setUp(self):
1646
tests.TestCase.setUp(self)
1689
super(Test, self).setUp()
1647
1690
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1692
def test_value(self):
1650
1693
self.assertEqual('original', self.orig)
1651
1694
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1696
self._run_successful_test(Test('test_value'))
1655
1697
self.assertEqual('original', obj.test_attr)
1699
def test_overrideAttr_with_no_existing_value_and_value(self):
1700
# Do not define the test_attribute
1701
obj = self # Make 'obj' visible to the embedded test
1702
class Test(tests.TestCase):
1705
tests.TestCase.setUp(self)
1706
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1708
def test_value(self):
1709
self.assertEqual(tests._unitialized_attr, self.orig)
1710
self.assertEqual('modified', obj.test_attr)
1712
self._run_successful_test(Test('test_value'))
1713
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1715
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1716
# Do not define the test_attribute
1717
obj = self # Make 'obj' visible to the embedded test
1718
class Test(tests.TestCase):
1721
tests.TestCase.setUp(self)
1722
self.orig = self.overrideAttr(obj, 'test_attr')
1724
def test_value(self):
1725
self.assertEqual(tests._unitialized_attr, self.orig)
1726
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1728
self._run_successful_test(Test('test_value'))
1729
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1731
def test_recordCalls(self):
1732
from breezy.tests import test_selftest
1733
calls = self.recordCalls(
1734
test_selftest, '_add_numbers')
1735
self.assertEqual(test_selftest._add_numbers(2, 10),
1737
self.assertEqual(calls, [((2, 10), {})])
1740
def _add_numbers(a, b):
1744
class _MissingFeature(features.Feature):
1747
missing_feature = _MissingFeature()
1750
def _get_test(name):
1751
"""Get an instance of a specific example test.
1753
We protect this in a function so that they don't auto-run in the test
1757
class ExampleTests(tests.TestCase):
1759
def test_fail(self):
1760
mutter('this was a failing test')
1761
self.fail('this test will fail')
1763
def test_error(self):
1764
mutter('this test errored')
1765
raise RuntimeError('gotcha')
1767
def test_missing_feature(self):
1768
mutter('missing the feature')
1769
self.requireFeature(missing_feature)
1771
def test_skip(self):
1772
mutter('this test will be skipped')
1773
raise tests.TestSkipped('reason')
1775
def test_success(self):
1776
mutter('this test succeeds')
1778
def test_xfail(self):
1779
mutter('test with expected failure')
1780
self.knownFailure('this_fails')
1782
def test_unexpected_success(self):
1783
mutter('test with unexpected success')
1784
self.expectFailure('should_fail', lambda: None)
1786
return ExampleTests(name)
1789
class TestTestCaseLogDetails(tests.TestCase):
1791
def _run_test(self, test_name):
1792
test = _get_test(test_name)
1793
result = testtools.TestResult()
1797
def test_fail_has_log(self):
1798
result = self._run_test('test_fail')
1799
self.assertEqual(1, len(result.failures))
1800
result_content = result.failures[0][1]
1801
self.assertContainsRe(result_content,
1802
'(?m)^(?:Text attachment: )?log(?:$|: )')
1803
self.assertContainsRe(result_content, 'this was a failing test')
1805
def test_error_has_log(self):
1806
result = self._run_test('test_error')
1807
self.assertEqual(1, len(result.errors))
1808
result_content = result.errors[0][1]
1809
self.assertContainsRe(result_content,
1810
'(?m)^(?:Text attachment: )?log(?:$|: )')
1811
self.assertContainsRe(result_content, 'this test errored')
1813
def test_skip_has_no_log(self):
1814
result = self._run_test('test_skip')
1815
self.assertEqual(['reason'], result.skip_reasons.keys())
1816
skips = result.skip_reasons['reason']
1817
self.assertEqual(1, len(skips))
1819
self.assertFalse('log' in test.getDetails())
1821
def test_missing_feature_has_no_log(self):
1822
# testtools doesn't know about addNotSupported, so it just gets
1823
# considered as a skip
1824
result = self._run_test('test_missing_feature')
1825
self.assertEqual([missing_feature], result.skip_reasons.keys())
1826
skips = result.skip_reasons[missing_feature]
1827
self.assertEqual(1, len(skips))
1829
self.assertFalse('log' in test.getDetails())
1831
def test_xfail_has_no_log(self):
1832
result = self._run_test('test_xfail')
1833
self.assertEqual(1, len(result.expectedFailures))
1834
result_content = result.expectedFailures[0][1]
1835
self.assertNotContainsRe(result_content,
1836
'(?m)^(?:Text attachment: )?log(?:$|: )')
1837
self.assertNotContainsRe(result_content, 'test with expected failure')
1839
def test_unexpected_success_has_log(self):
1840
result = self._run_test('test_unexpected_success')
1841
self.assertEqual(1, len(result.unexpectedSuccesses))
1842
# Inconsistency, unexpectedSuccesses is a list of tests,
1843
# expectedFailures is a list of reasons?
1844
test = result.unexpectedSuccesses[0]
1845
details = test.getDetails()
1846
self.assertTrue('log' in details)
1849
class TestTestCloning(tests.TestCase):
1850
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1852
def test_cloned_testcase_does_not_share_details(self):
1853
"""A TestCase cloned with clone_test does not share mutable attributes
1854
such as details or cleanups.
1856
class Test(tests.TestCase):
1858
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1859
orig_test = Test('test_foo')
1860
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1861
orig_test.run(unittest.TestResult())
1862
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1863
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1865
def test_double_apply_scenario_preserves_first_scenario(self):
1866
"""Applying two levels of scenarios to a test preserves the attributes
1867
added by both scenarios.
1869
class Test(tests.TestCase):
1872
test = Test('test_foo')
1873
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1874
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1875
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1876
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1877
all_tests = list(tests.iter_suite_tests(suite))
1878
self.assertLength(4, all_tests)
1879
all_xys = sorted((t.x, t.y) for t in all_tests)
1880
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1883
# NB: Don't delete this; it's not actually from 0.11!
1659
1884
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2193
load_list='missing file name', list_only=True)
2196
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2198
_test_needs_features = [features.subunit]
2200
def run_subunit_stream(self, test_name):
2201
from subunit import ProtocolTestCase
2203
return TestUtil.TestSuite([_get_test(test_name)])
2204
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2205
test_suite_factory=factory)
2206
test = ProtocolTestCase(stream)
2207
result = testtools.TestResult()
2209
content = stream.getvalue()
2210
return content, result
2212
def test_fail_has_log(self):
2213
content, result = self.run_subunit_stream('test_fail')
2214
self.assertEqual(1, len(result.failures))
2215
self.assertContainsRe(content, '(?m)^log$')
2216
self.assertContainsRe(content, 'this test will fail')
2218
def test_error_has_log(self):
2219
content, result = self.run_subunit_stream('test_error')
2220
self.assertContainsRe(content, '(?m)^log$')
2221
self.assertContainsRe(content, 'this test errored')
2223
def test_skip_has_no_log(self):
2224
content, result = self.run_subunit_stream('test_skip')
2225
self.assertNotContainsRe(content, '(?m)^log$')
2226
self.assertNotContainsRe(content, 'this test will be skipped')
2227
self.assertEqual(['reason'], result.skip_reasons.keys())
2228
skips = result.skip_reasons['reason']
2229
self.assertEqual(1, len(skips))
2231
# RemotedTestCase doesn't preserve the "details"
2232
## self.assertFalse('log' in test.getDetails())
2234
def test_missing_feature_has_no_log(self):
2235
content, result = self.run_subunit_stream('test_missing_feature')
2236
self.assertNotContainsRe(content, '(?m)^log$')
2237
self.assertNotContainsRe(content, 'missing the feature')
2238
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2239
skips = result.skip_reasons['_MissingFeature\n']
2240
self.assertEqual(1, len(skips))
2242
# RemotedTestCase doesn't preserve the "details"
2243
## self.assertFalse('log' in test.getDetails())
2245
def test_xfail_has_no_log(self):
2246
content, result = self.run_subunit_stream('test_xfail')
2247
self.assertNotContainsRe(content, '(?m)^log$')
2248
self.assertNotContainsRe(content, 'test with expected failure')
2249
self.assertEqual(1, len(result.expectedFailures))
2250
result_content = result.expectedFailures[0][1]
2251
self.assertNotContainsRe(result_content,
2252
'(?m)^(?:Text attachment: )?log(?:$|: )')
2253
self.assertNotContainsRe(result_content, 'test with expected failure')
2255
def test_unexpected_success_has_log(self):
2256
content, result = self.run_subunit_stream('test_unexpected_success')
2257
self.assertContainsRe(content, '(?m)^log$')
2258
self.assertContainsRe(content, 'test with unexpected success')
2259
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2260
# success, if a min version check is added remove this
2261
from subunit import TestProtocolClient as _Client
2262
if _Client.addUnexpectedSuccess.__func__ is _Client.addSuccess.__func__:
2263
self.expectFailure('subunit treats "unexpectedSuccess"'
2264
' as a plain success',
2265
self.assertEqual, 1, len(result.unexpectedSuccesses))
2266
self.assertEqual(1, len(result.unexpectedSuccesses))
2267
test = result.unexpectedSuccesses[0]
2268
# RemotedTestCase doesn't preserve the "details"
2269
## self.assertTrue('log' in test.getDetails())
2271
def test_success_has_no_log(self):
2272
content, result = self.run_subunit_stream('test_success')
2273
self.assertEqual(1, result.testsRun)
2274
self.assertNotContainsRe(content, '(?m)^log$')
2275
self.assertNotContainsRe(content, 'this test succeeds')
1974
2278
class TestRunBzr(tests.TestCase):
2264
2568
class TestStartBzrSubProcess(tests.TestCase):
2569
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2571
def _subprocess_log_cleanup(self):
2572
"""Inhibits the base version as we don't produce a log file."""
2269
2574
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2575
"""Override the base version to record the command that is run.
2577
From there we can ensure it is correct without spawning a real process.
2271
2579
self.check_popen_state()
2272
2580
self._popen_args = args
2273
2581
self._popen_kwargs = kwargs
2274
2582
raise _DontSpawnProcess()
2584
def check_popen_state(self):
2585
"""Replace to make assertions when popen is called."""
2276
2587
def test_run_bzr_subprocess_no_plugins(self):
2277
2588
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2589
command = self._popen_args[0]
2279
2590
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2591
self.assertEqual(self.get_brz_path(), command[1])
2281
2592
self.assertEqual(['--no-plugins'], command[2:])
2283
2594
def test_allow_plugins(self):
2284
2595
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2597
command = self._popen_args[0]
2287
2598
self.assertEqual([], command[2:])
2289
2600
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2601
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2602
# set in the child
2292
2603
def check_environment():
2293
2604
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2605
self.check_popen_state = check_environment
2295
2606
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2607
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2608
# not set in theparent
2298
2609
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2611
def test_run_bzr_subprocess_env_del(self):
2301
2612
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2613
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2614
def check_environment():
2304
2615
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2616
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2617
self.check_popen_state = check_environment
2307
2618
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2619
env_changes={'EXISTANT_ENV_VAR':None})
2309
2620
# Still set in parent
2310
2621
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2622
del os.environ['EXISTANT_ENV_VAR']
2313
2624
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2625
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2626
def check_environment():
2316
2627
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2628
self.check_popen_state = check_environment
2318
2629
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2630
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2632
def test_working_dir(self):
2322
2633
"""Test that we can specify the working dir for the child"""
2354
2664
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
2666
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2667
self.assertEqual('brz: interrupted\n', result[1])
2434
2670
class TestSelftestFiltering(tests.TestCase):
2436
2672
def setUp(self):
2437
tests.TestCase.setUp(self)
2673
super(TestSelftestFiltering, self).setUp()
2438
2674
self.suite = TestUtil.TestSuite()
2439
2675
self.loader = TestUtil.TestLoader()
2440
2676
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2677
sys.modules['breezy.tests.test_selftest']))
2442
2678
self.all_names = _test_ids(self.suite)
2444
2680
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2681
test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2446
2682
'test_condition_id_re')
2447
2683
filtered_suite = tests.filter_suite_by_condition(
2448
2684
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2685
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2687
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2688
test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2453
2689
'test_condition_id_in_list']
2454
2690
id_list = tests.TestIdList(test_names)
2455
2691
filtered_suite = tests.filter_suite_by_condition(
2953
3190
def test_predefined_prefixes(self):
2954
3191
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3192
self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3193
self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3194
self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3195
self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3196
self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3197
self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3200
class TestThreadLeakDetection(tests.TestCase):
3201
"""Ensure when tests leak threads we detect and report it"""
3203
class LeakRecordingResult(tests.ExtendedTestResult):
3205
tests.ExtendedTestResult.__init__(self, BytesIO(), 0, 1)
3207
def _report_thread_leak(self, test, leaks, alive):
3208
self.leaks.append((test, leaks))
3210
def test_testcase_without_addCleanups(self):
3211
"""Check old TestCase instances don't break with leak detection"""
3212
class Test(unittest.TestCase):
3215
result = self.LeakRecordingResult()
3217
result.startTestRun()
3219
result.stopTestRun()
3220
self.assertEqual(result._tests_leaking_threads_count, 0)
3221
self.assertEqual(result.leaks, [])
3223
def test_thread_leak(self):
3224
"""Ensure a thread that outlives the running of a test is reported
3226
Uses a thread that blocks on an event, and is started by the inner
3227
test case. As the thread outlives the inner case's run, it should be
3228
detected as a leak, but the event is then set so that the thread can
3229
be safely joined in cleanup so it's not leaked for real.
3231
event = threading.Event()
3232
thread = threading.Thread(name="Leaker", target=event.wait)
3233
class Test(tests.TestCase):
3234
def test_leak(self):
3236
result = self.LeakRecordingResult()
3237
test = Test("test_leak")
3238
self.addCleanup(thread.join)
3239
self.addCleanup(event.set)
3240
result.startTestRun()
3242
result.stopTestRun()
3243
self.assertEqual(result._tests_leaking_threads_count, 1)
3244
self.assertEqual(result._first_thread_leaker_id, test.id())
3245
self.assertEqual(result.leaks, [(test, {thread})])
3246
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3248
def test_multiple_leaks(self):
3249
"""Check multiple leaks are blamed on the test cases at fault
3251
Same concept as the previous test, but has one inner test method that
3252
leaks two threads, and one that doesn't leak at all.
3254
event = threading.Event()
3255
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3256
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3257
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3258
class Test(tests.TestCase):
3259
def test_first_leak(self):
3261
def test_second_no_leak(self):
3263
def test_third_leak(self):
3266
result = self.LeakRecordingResult()
3267
first_test = Test("test_first_leak")
3268
third_test = Test("test_third_leak")
3269
self.addCleanup(thread_a.join)
3270
self.addCleanup(thread_b.join)
3271
self.addCleanup(thread_c.join)
3272
self.addCleanup(event.set)
3273
result.startTestRun()
3275
[first_test, Test("test_second_no_leak"), third_test]
3277
result.stopTestRun()
3278
self.assertEqual(result._tests_leaking_threads_count, 2)
3279
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3280
self.assertEqual(result.leaks, [
3281
(first_test, {thread_b}),
3282
(third_test, {thread_a, thread_c})])
3283
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3286
class TestPostMortemDebugging(tests.TestCase):
3287
"""Check post mortem debugging works when tests fail or error"""
3289
class TracebackRecordingResult(tests.ExtendedTestResult):
3291
tests.ExtendedTestResult.__init__(self, BytesIO(), 0, 1)
3292
self.postcode = None
3293
def _post_mortem(self, tb=None):
3294
"""Record the code object at the end of the current traceback"""
3295
tb = tb or sys.exc_info()[2]
3298
while next is not None:
3301
self.postcode = tb.tb_frame.f_code
3302
def report_error(self, test, err):
3304
def report_failure(self, test, err):
3307
def test_location_unittest_error(self):
3308
"""Needs right post mortem traceback with erroring unittest case"""
3309
class Test(unittest.TestCase):
3312
result = self.TracebackRecordingResult()
3314
self.assertEqual(result.postcode, Test.runTest.__code__)
3316
def test_location_unittest_failure(self):
3317
"""Needs right post mortem traceback with failing unittest case"""
3318
class Test(unittest.TestCase):
3320
raise self.failureException
3321
result = self.TracebackRecordingResult()
3323
self.assertEqual(result.postcode, Test.runTest.__code__)
3325
def test_location_bt_error(self):
3326
"""Needs right post mortem traceback with erroring breezy.tests case"""
3327
class Test(tests.TestCase):
3328
def test_error(self):
3330
result = self.TracebackRecordingResult()
3331
Test("test_error").run(result)
3332
self.assertEqual(result.postcode, Test.test_error.__code__)
3334
def test_location_bt_failure(self):
3335
"""Needs right post mortem traceback with failing breezy.tests case"""
3336
class Test(tests.TestCase):
3337
def test_failure(self):
3338
raise self.failureException
3339
result = self.TracebackRecordingResult()
3340
Test("test_failure").run(result)
3341
self.assertEqual(result.postcode, Test.test_failure.__code__)
3343
def test_env_var_triggers_post_mortem(self):
3344
"""Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3346
result = tests.ExtendedTestResult(BytesIO(), 0, 1)
3347
post_mortem_calls = []
3348
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3349
self.overrideEnv('BRZ_TEST_PDB', None)
3350
result._post_mortem(1)
3351
self.overrideEnv('BRZ_TEST_PDB', 'on')
3352
result._post_mortem(2)
3353
self.assertEqual([2], post_mortem_calls)
2963
3356
class TestRunSuite(tests.TestCase):
2974
3367
calls.append(test)
2975
3368
return tests.ExtendedTestResult(self.stream, self.descriptions,
2976
3369
self.verbosity)
2977
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3370
tests.run_suite(suite, runner_class=MyRunner, stream=BytesIO())
2978
3371
self.assertLength(1, calls)
3374
class _Selftest(object):
3375
"""Mixin for tests needing full selftest output"""
3377
def _inject_stream_into_subunit(self, stream):
3378
"""To be overridden by subclasses that run tests out of process"""
3380
def _run_selftest(self, **kwargs):
3382
self._inject_stream_into_subunit(sio)
3383
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3384
return sio.getvalue()
3387
class _ForkedSelftest(_Selftest):
3388
"""Mixin for tests needing full selftest output with forked children"""
3390
_test_needs_features = [features.subunit]
3392
def _inject_stream_into_subunit(self, stream):
3393
"""Monkey-patch subunit so the extra output goes to stream not stdout
3395
Some APIs need rewriting so this kind of bogus hackery can be replaced
3396
by passing the stream param from run_tests down into ProtocolTestCase.
3398
from subunit import ProtocolTestCase
3399
_original_init = ProtocolTestCase.__init__
3400
def _init_with_passthrough(self, *args, **kwargs):
3401
_original_init(self, *args, **kwargs)
3402
self._passthrough = stream
3403
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3405
def _run_selftest(self, **kwargs):
3406
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3407
if getattr(os, "fork", None) is None:
3408
raise tests.TestNotApplicable("Platform doesn't support forking")
3409
# Make sure the fork code is actually invoked by claiming two cores
3410
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3411
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3412
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3415
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3416
"""Check operation of --parallel=fork selftest option"""
3418
def test_error_in_child_during_fork(self):
3419
"""Error in a forked child during test setup should get reported"""
3420
class Test(tests.TestCase):
3421
def testMethod(self):
3423
# We don't care what, just break something that a child will run
3424
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3425
out = self._run_selftest(test_suite_factory=Test)
3426
# Lines from the tracebacks of the two child processes may be mixed
3427
# together due to the way subunit parses and forwards the streams,
3428
# so permit extra lines between each part of the error output.
3429
self.assertContainsRe(out,
3432
".+ in fork_for_tests\n"
3434
"\s*workaround_zealous_crypto_random\(\)\n"
3439
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3440
"""Check a test case still alive after being run emits a warning"""
3442
class Test(tests.TestCase):
3443
def test_pass(self):
3445
def test_self_ref(self):
3446
self.also_self = self.test_self_ref
3447
def test_skip(self):
3448
self.skipTest("Don't need")
3450
def _get_suite(self):
3451
return TestUtil.TestSuite([
3452
self.Test("test_pass"),
3453
self.Test("test_self_ref"),
3454
self.Test("test_skip"),
3457
def _run_selftest_with_suite(self, **kwargs):
3458
old_flags = tests.selftest_debug_flags
3459
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3460
gc_on = gc.isenabled()
3464
output = self._run_selftest(test_suite_factory=self._get_suite,
3469
tests.selftest_debug_flags = old_flags
3470
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3471
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3474
def test_testsuite(self):
3475
self._run_selftest_with_suite()
3477
def test_pattern(self):
3478
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3479
self.assertNotContainsRe(out, "test_skip")
3481
def test_exclude_pattern(self):
3482
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3483
self.assertNotContainsRe(out, "test_skip")
3485
def test_random_seed(self):
3486
self._run_selftest_with_suite(random_seed="now")
3488
def test_matching_tests_first(self):
3489
self._run_selftest_with_suite(matching_tests_first=True,
3490
pattern="test_self_ref$")
3492
def test_starting_with_and_exclude(self):
3493
out = self._run_selftest_with_suite(starting_with=["bt."],
3494
exclude_pattern="test_skip$")
3495
self.assertNotContainsRe(out, "test_skip")
3497
def test_additonal_decorator(self):
3498
out = self._run_selftest_with_suite(
3499
suite_decorators=[tests.TestDecorator])
3502
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3503
"""Check warnings from tests staying alive are emitted with subunit"""
3505
_test_needs_features = [features.subunit]
3507
def _run_selftest_with_suite(self, **kwargs):
3508
return TestUncollectedWarnings._run_selftest_with_suite(self,
3509
runner_class=tests.SubUnitBzrRunner, **kwargs)
3512
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3513
"""Check warnings from tests staying alive are emitted when forking"""
3516
class TestEnvironHandling(tests.TestCase):
3518
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3519
self.assertFalse('MYVAR' in os.environ)
3520
self.overrideEnv('MYVAR', '42')
3521
# We use an embedded test to make sure we fix the _captureVar bug
3522
class Test(tests.TestCase):
3524
# The first call save the 42 value
3525
self.overrideEnv('MYVAR', None)
3526
self.assertEqual(None, os.environ.get('MYVAR'))
3527
# Make sure we can call it twice
3528
self.overrideEnv('MYVAR', None)
3529
self.assertEqual(None, os.environ.get('MYVAR'))
3531
result = tests.TextTestResult(output, 0, 1)
3532
Test('test_me').run(result)
3533
if not result.wasStrictlySuccessful():
3534
self.fail(output.getvalue())
3535
# We get our value back
3536
self.assertEqual('42', os.environ.get('MYVAR'))
3539
class TestIsolatedEnv(tests.TestCase):
3540
"""Test isolating tests from os.environ.
3542
Since we use tests that are already isolated from os.environ a bit of care
3543
should be taken when designing the tests to avoid bootstrap side-effects.
3544
The tests start an already clean os.environ which allow doing valid
3545
assertions about which variables are present or not and design tests around
3549
class ScratchMonkey(tests.TestCase):
3554
def test_basics(self):
3555
# Make sure we know the definition of BRZ_HOME: not part of os.environ
3556
# for tests.TestCase.
3557
self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3558
self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3559
# Being part of isolated_environ, BRZ_HOME should not appear here
3560
self.assertFalse('BRZ_HOME' in os.environ)
3561
# Make sure we know the definition of LINES: part of os.environ for
3563
self.assertTrue('LINES' in tests.isolated_environ)
3564
self.assertEqual('25', tests.isolated_environ['LINES'])
3565
self.assertEqual('25', os.environ['LINES'])
3567
def test_injecting_unknown_variable(self):
3568
# BRZ_HOME is known to be absent from os.environ
3569
test = self.ScratchMonkey('test_me')
3570
tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3571
self.assertEqual('foo', os.environ['BRZ_HOME'])
3572
tests.restore_os_environ(test)
3573
self.assertFalse('BRZ_HOME' in os.environ)
3575
def test_injecting_known_variable(self):
3576
test = self.ScratchMonkey('test_me')
3577
# LINES is known to be present in os.environ
3578
tests.override_os_environ(test, {'LINES': '42'})
3579
self.assertEqual('42', os.environ['LINES'])
3580
tests.restore_os_environ(test)
3581
self.assertEqual('25', os.environ['LINES'])
3583
def test_deleting_variable(self):
3584
test = self.ScratchMonkey('test_me')
3585
# LINES is known to be present in os.environ
3586
tests.override_os_environ(test, {'LINES': None})
3587
self.assertTrue('LINES' not in os.environ)
3588
tests.restore_os_environ(test)
3589
self.assertEqual('25', os.environ['LINES'])
3592
class TestDocTestSuiteIsolation(tests.TestCase):
3593
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3595
Since tests.TestCase alreay provides an isolation from os.environ, we use
3596
the clean environment as a base for testing. To precisely capture the
3597
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3600
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3601
not `os.environ` so each test overrides it to suit its needs.
3605
def get_doctest_suite_for_string(self, klass, string):
3606
class Finder(doctest.DocTestFinder):
3608
def find(*args, **kwargs):
3609
test = doctest.DocTestParser().get_doctest(
3610
string, {}, 'foo', 'foo.py', 0)
3613
suite = klass(test_finder=Finder())
3616
def run_doctest_suite_for_string(self, klass, string):
3617
suite = self.get_doctest_suite_for_string(klass, string)
3619
result = tests.TextTestResult(output, 0, 1)
3621
return result, output
3623
def assertDocTestStringSucceds(self, klass, string):
3624
result, output = self.run_doctest_suite_for_string(klass, string)
3625
if not result.wasStrictlySuccessful():
3626
self.fail(output.getvalue())
3628
def assertDocTestStringFails(self, klass, string):
3629
result, output = self.run_doctest_suite_for_string(klass, string)
3630
if result.wasStrictlySuccessful():
3631
self.fail(output.getvalue())
3633
def test_injected_variable(self):
3634
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3637
>>> os.environ['LINES']
3640
# doctest.DocTestSuite fails as it sees '25'
3641
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3642
# tests.DocTestSuite sees '42'
3643
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3645
def test_deleted_variable(self):
3646
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3649
>>> os.environ.get('LINES')
3651
# doctest.DocTestSuite fails as it sees '25'
3652
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3653
# tests.DocTestSuite sees None
3654
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3657
class TestSelftestExcludePatterns(tests.TestCase):
3660
super(TestSelftestExcludePatterns, self).setUp()
3661
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3663
def suite_factory(self, keep_only=None, starting_with=None):
3664
"""A test suite factory with only a few tests."""
3665
class Test(tests.TestCase):
3667
# We don't need the full class path
3668
return self._testMethodName
3675
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3677
def assertTestList(self, expected, *selftest_args):
3678
# We rely on setUp installing the right test suite factory so we can
3679
# test at the command level without loading the whole test suite
3680
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3681
actual = out.splitlines()
3682
self.assertEqual(expected, actual)
3684
def test_full_list(self):
3685
self.assertTestList(['a', 'b', 'c'])
3687
def test_single_exclude(self):
3688
self.assertTestList(['b', 'c'], '-x', 'a')
3690
def test_mutiple_excludes(self):
3691
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3694
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3696
_test_needs_features = [features.subunit]
3699
super(TestCounterHooks, self).setUp()
3700
class Test(tests.TestCase):
3703
super(Test, self).setUp()
3704
self.hooks = hooks.Hooks()
3705
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3706
self.install_counter_hook(self.hooks, 'myhook')
3711
def run_hook_once(self):
3712
for hook in self.hooks['myhook']:
3715
self.test_class = Test
3717
def assertHookCalls(self, expected_calls, test_name):
3718
test = self.test_class(test_name)
3719
result = unittest.TestResult()
3721
self.assertTrue(hasattr(test, '_counters'))
3722
self.assertTrue('myhook' in test._counters)
3723
self.assertEqual(expected_calls, test._counters['myhook'])
3725
def test_no_hook(self):
3726
self.assertHookCalls(0, 'no_hook')
3728
def test_run_hook_once(self):
3729
tt = features.testtools
3730
if tt.module.__version__ < (0, 9, 8):
3731
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3732
self.assertHookCalls(1, 'run_hook_once')