603
650
def test_dangling_locks_cause_failures(self):
604
651
class TestDanglingLock(tests.TestCaseWithMemoryTransport):
605
652
def test_function(self):
606
t = self.get_transport('.')
653
t = self.get_transport_from_path('.')
607
654
l = lockdir.LockDir(t, 'lock')
610
657
test = TestDanglingLock('test_function')
611
658
result = test.run()
659
total_failures = result.errors + result.failures
612
660
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
661
self.assertEqual(1, len(total_failures))
615
663
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
665
self.assertEqual(0, len(total_failures))
620
668
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
669
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
671
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
from bzrlib.transport.readonly import ReadonlyTransportDecorator
672
from brzlib.transport.readonly import ReadonlyTransportDecorator
626
673
self.vfs_transport_factory = memory.MemoryServer
627
674
self.transport_readonly_server = None
628
675
# calling get_readonly_transport() constructs a decorator on the url
630
677
url = self.get_readonly_url()
631
678
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
634
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
679
t = transport.get_transport_from_url(url)
680
t2 = transport.get_transport_from_url(url2)
681
self.assertIsInstance(t, ReadonlyTransportDecorator)
682
self.assertIsInstance(t2, ReadonlyTransportDecorator)
636
683
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
685
def test_get_readonly_url_http(self):
639
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
from bzrlib.transport.http import HttpTransportBase
686
from brzlib.tests.http_server import HttpServer
687
from brzlib.transport.http import HttpTransportBase
642
688
self.transport_server = test_server.LocalURLServer
643
689
self.transport_readonly_server = HttpServer
644
690
# calling get_readonly_transport() gives us a HTTP server instance.
645
691
url = self.get_readonly_url()
646
692
url2 = self.get_readonly_url('foo/bar')
647
693
# the transport returned may be any HttpTransportBase subclass
648
t = get_transport(url)
649
t2 = get_transport(url2)
650
self.failUnless(isinstance(t, HttpTransportBase))
651
self.failUnless(isinstance(t2, HttpTransportBase))
694
t = transport.get_transport_from_url(url)
695
t2 = transport.get_transport_from_url(url2)
696
self.assertIsInstance(t, HttpTransportBase)
697
self.assertIsInstance(t2, HttpTransportBase)
652
698
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
654
700
def test_is_directory(self):
1643
1684
class Test(tests.TestCase):
1645
1686
def setUp(self):
1646
tests.TestCase.setUp(self)
1687
super(Test, self).setUp()
1647
1688
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1649
1690
def test_value(self):
1650
1691
self.assertEqual('original', self.orig)
1651
1692
self.assertEqual('modified', obj.test_attr)
1653
test = Test('test_value')
1654
test.run(unittest.TestResult())
1694
self._run_successful_test(Test('test_value'))
1655
1695
self.assertEqual('original', obj.test_attr)
1697
def test_overrideAttr_with_no_existing_value_and_value(self):
1698
# Do not define the test_attribute
1699
obj = self # Make 'obj' visible to the embedded test
1700
class Test(tests.TestCase):
1703
tests.TestCase.setUp(self)
1704
self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1706
def test_value(self):
1707
self.assertEqual(tests._unitialized_attr, self.orig)
1708
self.assertEqual('modified', obj.test_attr)
1710
self._run_successful_test(Test('test_value'))
1711
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1713
def test_overrideAttr_with_no_existing_value_and_no_value(self):
1714
# Do not define the test_attribute
1715
obj = self # Make 'obj' visible to the embedded test
1716
class Test(tests.TestCase):
1719
tests.TestCase.setUp(self)
1720
self.orig = self.overrideAttr(obj, 'test_attr')
1722
def test_value(self):
1723
self.assertEqual(tests._unitialized_attr, self.orig)
1724
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1726
self._run_successful_test(Test('test_value'))
1727
self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1729
def test_recordCalls(self):
1730
from brzlib.tests import test_selftest
1731
calls = self.recordCalls(
1732
test_selftest, '_add_numbers')
1733
self.assertEqual(test_selftest._add_numbers(2, 10),
1735
self.assertEqual(calls, [((2, 10), {})])
1738
def _add_numbers(a, b):
1742
class _MissingFeature(features.Feature):
1745
missing_feature = _MissingFeature()
1748
def _get_test(name):
1749
"""Get an instance of a specific example test.
1751
We protect this in a function so that they don't auto-run in the test
1755
class ExampleTests(tests.TestCase):
1757
def test_fail(self):
1758
mutter('this was a failing test')
1759
self.fail('this test will fail')
1761
def test_error(self):
1762
mutter('this test errored')
1763
raise RuntimeError('gotcha')
1765
def test_missing_feature(self):
1766
mutter('missing the feature')
1767
self.requireFeature(missing_feature)
1769
def test_skip(self):
1770
mutter('this test will be skipped')
1771
raise tests.TestSkipped('reason')
1773
def test_success(self):
1774
mutter('this test succeeds')
1776
def test_xfail(self):
1777
mutter('test with expected failure')
1778
self.knownFailure('this_fails')
1780
def test_unexpected_success(self):
1781
mutter('test with unexpected success')
1782
self.expectFailure('should_fail', lambda: None)
1784
return ExampleTests(name)
1787
class TestTestCaseLogDetails(tests.TestCase):
1789
def _run_test(self, test_name):
1790
test = _get_test(test_name)
1791
result = testtools.TestResult()
1795
def test_fail_has_log(self):
1796
result = self._run_test('test_fail')
1797
self.assertEqual(1, len(result.failures))
1798
result_content = result.failures[0][1]
1799
self.assertContainsRe(result_content,
1800
'(?m)^(?:Text attachment: )?log(?:$|: )')
1801
self.assertContainsRe(result_content, 'this was a failing test')
1803
def test_error_has_log(self):
1804
result = self._run_test('test_error')
1805
self.assertEqual(1, len(result.errors))
1806
result_content = result.errors[0][1]
1807
self.assertContainsRe(result_content,
1808
'(?m)^(?:Text attachment: )?log(?:$|: )')
1809
self.assertContainsRe(result_content, 'this test errored')
1811
def test_skip_has_no_log(self):
1812
result = self._run_test('test_skip')
1813
self.assertEqual(['reason'], result.skip_reasons.keys())
1814
skips = result.skip_reasons['reason']
1815
self.assertEqual(1, len(skips))
1817
self.assertFalse('log' in test.getDetails())
1819
def test_missing_feature_has_no_log(self):
1820
# testtools doesn't know about addNotSupported, so it just gets
1821
# considered as a skip
1822
result = self._run_test('test_missing_feature')
1823
self.assertEqual([missing_feature], result.skip_reasons.keys())
1824
skips = result.skip_reasons[missing_feature]
1825
self.assertEqual(1, len(skips))
1827
self.assertFalse('log' in test.getDetails())
1829
def test_xfail_has_no_log(self):
1830
result = self._run_test('test_xfail')
1831
self.assertEqual(1, len(result.expectedFailures))
1832
result_content = result.expectedFailures[0][1]
1833
self.assertNotContainsRe(result_content,
1834
'(?m)^(?:Text attachment: )?log(?:$|: )')
1835
self.assertNotContainsRe(result_content, 'test with expected failure')
1837
def test_unexpected_success_has_log(self):
1838
result = self._run_test('test_unexpected_success')
1839
self.assertEqual(1, len(result.unexpectedSuccesses))
1840
# Inconsistency, unexpectedSuccesses is a list of tests,
1841
# expectedFailures is a list of reasons?
1842
test = result.unexpectedSuccesses[0]
1843
details = test.getDetails()
1844
self.assertTrue('log' in details)
1847
class TestTestCloning(tests.TestCase):
1848
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1850
def test_cloned_testcase_does_not_share_details(self):
1851
"""A TestCase cloned with clone_test does not share mutable attributes
1852
such as details or cleanups.
1854
class Test(tests.TestCase):
1856
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1857
orig_test = Test('test_foo')
1858
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1859
orig_test.run(unittest.TestResult())
1860
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1861
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1863
def test_double_apply_scenario_preserves_first_scenario(self):
1864
"""Applying two levels of scenarios to a test preserves the attributes
1865
added by both scenarios.
1867
class Test(tests.TestCase):
1870
test = Test('test_foo')
1871
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1872
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1873
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1874
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1875
all_tests = list(tests.iter_suite_tests(suite))
1876
self.assertLength(4, all_tests)
1877
all_xys = sorted((t.x, t.y) for t in all_tests)
1878
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1881
# NB: Don't delete this; it's not actually from 0.11!
1659
1882
@deprecated_function(deprecated_in((0, 11, 0)))
1971
2191
load_list='missing file name', list_only=True)
2194
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2196
_test_needs_features = [features.subunit]
2198
def run_subunit_stream(self, test_name):
2199
from subunit import ProtocolTestCase
2201
return TestUtil.TestSuite([_get_test(test_name)])
2202
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2203
test_suite_factory=factory)
2204
test = ProtocolTestCase(stream)
2205
result = testtools.TestResult()
2207
content = stream.getvalue()
2208
return content, result
2210
def test_fail_has_log(self):
2211
content, result = self.run_subunit_stream('test_fail')
2212
self.assertEqual(1, len(result.failures))
2213
self.assertContainsRe(content, '(?m)^log$')
2214
self.assertContainsRe(content, 'this test will fail')
2216
def test_error_has_log(self):
2217
content, result = self.run_subunit_stream('test_error')
2218
self.assertContainsRe(content, '(?m)^log$')
2219
self.assertContainsRe(content, 'this test errored')
2221
def test_skip_has_no_log(self):
2222
content, result = self.run_subunit_stream('test_skip')
2223
self.assertNotContainsRe(content, '(?m)^log$')
2224
self.assertNotContainsRe(content, 'this test will be skipped')
2225
self.assertEqual(['reason'], result.skip_reasons.keys())
2226
skips = result.skip_reasons['reason']
2227
self.assertEqual(1, len(skips))
2229
# RemotedTestCase doesn't preserve the "details"
2230
## self.assertFalse('log' in test.getDetails())
2232
def test_missing_feature_has_no_log(self):
2233
content, result = self.run_subunit_stream('test_missing_feature')
2234
self.assertNotContainsRe(content, '(?m)^log$')
2235
self.assertNotContainsRe(content, 'missing the feature')
2236
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2237
skips = result.skip_reasons['_MissingFeature\n']
2238
self.assertEqual(1, len(skips))
2240
# RemotedTestCase doesn't preserve the "details"
2241
## self.assertFalse('log' in test.getDetails())
2243
def test_xfail_has_no_log(self):
2244
content, result = self.run_subunit_stream('test_xfail')
2245
self.assertNotContainsRe(content, '(?m)^log$')
2246
self.assertNotContainsRe(content, 'test with expected failure')
2247
self.assertEqual(1, len(result.expectedFailures))
2248
result_content = result.expectedFailures[0][1]
2249
self.assertNotContainsRe(result_content,
2250
'(?m)^(?:Text attachment: )?log(?:$|: )')
2251
self.assertNotContainsRe(result_content, 'test with expected failure')
2253
def test_unexpected_success_has_log(self):
2254
content, result = self.run_subunit_stream('test_unexpected_success')
2255
self.assertContainsRe(content, '(?m)^log$')
2256
self.assertContainsRe(content, 'test with unexpected success')
2257
# GZ 2011-05-18: Old versions of subunit treat unexpected success as a
2258
# success, if a min version check is added remove this
2259
from subunit import TestProtocolClient as _Client
2260
if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func:
2261
self.expectFailure('subunit treats "unexpectedSuccess"'
2262
' as a plain success',
2263
self.assertEqual, 1, len(result.unexpectedSuccesses))
2264
self.assertEqual(1, len(result.unexpectedSuccesses))
2265
test = result.unexpectedSuccesses[0]
2266
# RemotedTestCase doesn't preserve the "details"
2267
## self.assertTrue('log' in test.getDetails())
2269
def test_success_has_no_log(self):
2270
content, result = self.run_subunit_stream('test_success')
2271
self.assertEqual(1, result.testsRun)
2272
self.assertNotContainsRe(content, '(?m)^log$')
2273
self.assertNotContainsRe(content, 'this test succeeds')
1974
2276
class TestRunBzr(tests.TestCase):
2264
2566
class TestStartBzrSubProcess(tests.TestCase):
2567
"""Stub test start_bzr_subprocess."""
2266
def check_popen_state(self):
2267
"""Replace to make assertions when popen is called."""
2569
def _subprocess_log_cleanup(self):
2570
"""Inhibits the base version as we don't produce a log file."""
2269
2572
def _popen(self, *args, **kwargs):
2270
"""Record the command that is run, so that we can ensure it is correct"""
2573
"""Override the base version to record the command that is run.
2575
From there we can ensure it is correct without spawning a real process.
2271
2577
self.check_popen_state()
2272
2578
self._popen_args = args
2273
2579
self._popen_kwargs = kwargs
2274
2580
raise _DontSpawnProcess()
2582
def check_popen_state(self):
2583
"""Replace to make assertions when popen is called."""
2276
2585
def test_run_bzr_subprocess_no_plugins(self):
2277
2586
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2278
2587
command = self._popen_args[0]
2279
2588
self.assertEqual(sys.executable, command[0])
2280
self.assertEqual(self.get_bzr_path(), command[1])
2589
self.assertEqual(self.get_brz_path(), command[1])
2281
2590
self.assertEqual(['--no-plugins'], command[2:])
2283
2592
def test_allow_plugins(self):
2284
2593
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2286
2595
command = self._popen_args[0]
2287
2596
self.assertEqual([], command[2:])
2289
2598
def test_set_env(self):
2290
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2599
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2291
2600
# set in the child
2292
2601
def check_environment():
2293
2602
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2294
2603
self.check_popen_state = check_environment
2295
2604
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2296
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2605
env_changes={'EXISTANT_ENV_VAR':'set variable'})
2297
2606
# not set in theparent
2298
2607
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2300
2609
def test_run_bzr_subprocess_env_del(self):
2301
2610
"""run_bzr_subprocess can remove environment variables too."""
2302
self.failIf('EXISTANT_ENV_VAR' in os.environ)
2611
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2303
2612
def check_environment():
2304
2613
self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2305
2614
os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2306
2615
self.check_popen_state = check_environment
2307
2616
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2308
env_changes={'EXISTANT_ENV_VAR':None})
2617
env_changes={'EXISTANT_ENV_VAR':None})
2309
2618
# Still set in parent
2310
2619
self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2311
2620
del os.environ['EXISTANT_ENV_VAR']
2313
2622
def test_env_del_missing(self):
2314
self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2623
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2315
2624
def check_environment():
2316
2625
self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2317
2626
self.check_popen_state = check_environment
2318
2627
self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2319
env_changes={'NON_EXISTANT_ENV_VAR':None})
2628
env_changes={'NON_EXISTANT_ENV_VAR':None})
2321
2630
def test_working_dir(self):
2322
2631
"""Test that we can specify the working dir for the child"""
2354
2662
result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2356
2664
self.assertEqual('', result[0])
2357
self.assertEqual('bzr: interrupted\n', result[1])
2360
class TestFeature(tests.TestCase):
2362
def test_caching(self):
2363
"""Feature._probe is called by the feature at most once."""
2364
class InstrumentedFeature(tests.Feature):
2366
super(InstrumentedFeature, self).__init__()
2369
self.calls.append('_probe')
2371
feature = InstrumentedFeature()
2373
self.assertEqual(['_probe'], feature.calls)
2375
self.assertEqual(['_probe'], feature.calls)
2377
def test_named_str(self):
2378
"""Feature.__str__ should thunk to feature_name()."""
2379
class NamedFeature(tests.Feature):
2380
def feature_name(self):
2382
feature = NamedFeature()
2383
self.assertEqual('symlinks', str(feature))
2385
def test_default_str(self):
2386
"""Feature.__str__ should default to __class__.__name__."""
2387
class NamedFeature(tests.Feature):
2389
feature = NamedFeature()
2390
self.assertEqual('NamedFeature', str(feature))
2393
class TestUnavailableFeature(tests.TestCase):
2395
def test_access_feature(self):
2396
feature = tests.Feature()
2397
exception = tests.UnavailableFeature(feature)
2398
self.assertIs(feature, exception.args[0])
2401
simple_thunk_feature = tests._CompatabilityThunkFeature(
2402
deprecated_in((2, 1, 0)),
2403
'bzrlib.tests.test_selftest',
2404
'simple_thunk_feature','UnicodeFilename',
2405
replacement_module='bzrlib.tests'
2408
class Test_CompatibilityFeature(tests.TestCase):
2410
def test_does_thunk(self):
2411
res = self.callDeprecated(
2412
['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
2413
' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
2414
simple_thunk_feature.available)
2415
self.assertEqual(tests.UnicodeFilename.available(), res)
2418
class TestModuleAvailableFeature(tests.TestCase):
2420
def test_available_module(self):
2421
feature = tests.ModuleAvailableFeature('bzrlib.tests')
2422
self.assertEqual('bzrlib.tests', feature.module_name)
2423
self.assertEqual('bzrlib.tests', str(feature))
2424
self.assertTrue(feature.available())
2425
self.assertIs(tests, feature.module)
2427
def test_unavailable_module(self):
2428
feature = tests.ModuleAvailableFeature('bzrlib.no_such_module_exists')
2429
self.assertEqual('bzrlib.no_such_module_exists', str(feature))
2430
self.assertFalse(feature.available())
2431
self.assertIs(None, feature.module)
2665
self.assertEqual('brz: interrupted\n', result[1])
2434
2668
class TestSelftestFiltering(tests.TestCase):
2436
2670
def setUp(self):
2437
tests.TestCase.setUp(self)
2671
super(TestSelftestFiltering, self).setUp()
2438
2672
self.suite = TestUtil.TestSuite()
2439
2673
self.loader = TestUtil.TestLoader()
2440
2674
self.suite.addTest(self.loader.loadTestsFromModule(
2441
sys.modules['bzrlib.tests.test_selftest']))
2675
sys.modules['brzlib.tests.test_selftest']))
2442
2676
self.all_names = _test_ids(self.suite)
2444
2678
def test_condition_id_re(self):
2445
test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.'
2679
test_name = ('brzlib.tests.test_selftest.TestSelftestFiltering.'
2446
2680
'test_condition_id_re')
2447
2681
filtered_suite = tests.filter_suite_by_condition(
2448
2682
self.suite, tests.condition_id_re('test_condition_id_re'))
2449
2683
self.assertEqual([test_name], _test_ids(filtered_suite))
2451
2685
def test_condition_id_in_list(self):
2452
test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.'
2686
test_names = ['brzlib.tests.test_selftest.TestSelftestFiltering.'
2453
2687
'test_condition_id_in_list']
2454
2688
id_list = tests.TestIdList(test_names)
2455
2689
filtered_suite = tests.filter_suite_by_condition(
2953
3188
def test_predefined_prefixes(self):
2954
3189
tpr = tests.test_prefix_alias_registry
2955
self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib'))
2956
self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd'))
2957
self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu'))
2958
self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt'))
2959
self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb'))
2960
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3190
self.assertEqual('brzlib', tpr.resolve_alias('brzlib'))
3191
self.assertEqual('brzlib.doc', tpr.resolve_alias('bd'))
3192
self.assertEqual('brzlib.utils', tpr.resolve_alias('bu'))
3193
self.assertEqual('brzlib.tests', tpr.resolve_alias('bt'))
3194
self.assertEqual('brzlib.tests.blackbox', tpr.resolve_alias('bb'))
3195
self.assertEqual('brzlib.plugins', tpr.resolve_alias('bp'))
3198
class TestThreadLeakDetection(tests.TestCase):
3199
"""Ensure when tests leak threads we detect and report it"""
3201
class LeakRecordingResult(tests.ExtendedTestResult):
3203
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3205
def _report_thread_leak(self, test, leaks, alive):
3206
self.leaks.append((test, leaks))
3208
def test_testcase_without_addCleanups(self):
3209
"""Check old TestCase instances don't break with leak detection"""
3210
class Test(unittest.TestCase):
3213
result = self.LeakRecordingResult()
3215
result.startTestRun()
3217
result.stopTestRun()
3218
self.assertEqual(result._tests_leaking_threads_count, 0)
3219
self.assertEqual(result.leaks, [])
3221
def test_thread_leak(self):
3222
"""Ensure a thread that outlives the running of a test is reported
3224
Uses a thread that blocks on an event, and is started by the inner
3225
test case. As the thread outlives the inner case's run, it should be
3226
detected as a leak, but the event is then set so that the thread can
3227
be safely joined in cleanup so it's not leaked for real.
3229
event = threading.Event()
3230
thread = threading.Thread(name="Leaker", target=event.wait)
3231
class Test(tests.TestCase):
3232
def test_leak(self):
3234
result = self.LeakRecordingResult()
3235
test = Test("test_leak")
3236
self.addCleanup(thread.join)
3237
self.addCleanup(event.set)
3238
result.startTestRun()
3240
result.stopTestRun()
3241
self.assertEqual(result._tests_leaking_threads_count, 1)
3242
self.assertEqual(result._first_thread_leaker_id, test.id())
3243
self.assertEqual(result.leaks, [(test, set([thread]))])
3244
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3246
def test_multiple_leaks(self):
3247
"""Check multiple leaks are blamed on the test cases at fault
3249
Same concept as the previous test, but has one inner test method that
3250
leaks two threads, and one that doesn't leak at all.
3252
event = threading.Event()
3253
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3254
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3255
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3256
class Test(tests.TestCase):
3257
def test_first_leak(self):
3259
def test_second_no_leak(self):
3261
def test_third_leak(self):
3264
result = self.LeakRecordingResult()
3265
first_test = Test("test_first_leak")
3266
third_test = Test("test_third_leak")
3267
self.addCleanup(thread_a.join)
3268
self.addCleanup(thread_b.join)
3269
self.addCleanup(thread_c.join)
3270
self.addCleanup(event.set)
3271
result.startTestRun()
3273
[first_test, Test("test_second_no_leak"), third_test]
3275
result.stopTestRun()
3276
self.assertEqual(result._tests_leaking_threads_count, 2)
3277
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3278
self.assertEqual(result.leaks, [
3279
(first_test, set([thread_b])),
3280
(third_test, set([thread_a, thread_c]))])
3281
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3284
class TestPostMortemDebugging(tests.TestCase):
3285
"""Check post mortem debugging works when tests fail or error"""
3287
class TracebackRecordingResult(tests.ExtendedTestResult):
3289
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3290
self.postcode = None
3291
def _post_mortem(self, tb=None):
3292
"""Record the code object at the end of the current traceback"""
3293
tb = tb or sys.exc_info()[2]
3296
while next is not None:
3299
self.postcode = tb.tb_frame.f_code
3300
def report_error(self, test, err):
3302
def report_failure(self, test, err):
3305
def test_location_unittest_error(self):
3306
"""Needs right post mortem traceback with erroring unittest case"""
3307
class Test(unittest.TestCase):
3310
result = self.TracebackRecordingResult()
3312
self.assertEqual(result.postcode, Test.runTest.func_code)
3314
def test_location_unittest_failure(self):
3315
"""Needs right post mortem traceback with failing unittest case"""
3316
class Test(unittest.TestCase):
3318
raise self.failureException
3319
result = self.TracebackRecordingResult()
3321
self.assertEqual(result.postcode, Test.runTest.func_code)
3323
def test_location_bt_error(self):
3324
"""Needs right post mortem traceback with erroring brzlib.tests case"""
3325
class Test(tests.TestCase):
3326
def test_error(self):
3328
result = self.TracebackRecordingResult()
3329
Test("test_error").run(result)
3330
self.assertEqual(result.postcode, Test.test_error.func_code)
3332
def test_location_bt_failure(self):
3333
"""Needs right post mortem traceback with failing brzlib.tests case"""
3334
class Test(tests.TestCase):
3335
def test_failure(self):
3336
raise self.failureException
3337
result = self.TracebackRecordingResult()
3338
Test("test_failure").run(result)
3339
self.assertEqual(result.postcode, Test.test_failure.func_code)
3341
def test_env_var_triggers_post_mortem(self):
3342
"""Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
3344
result = tests.ExtendedTestResult(StringIO(), 0, 1)
3345
post_mortem_calls = []
3346
self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3347
self.overrideEnv('BZR_TEST_PDB', None)
3348
result._post_mortem(1)
3349
self.overrideEnv('BZR_TEST_PDB', 'on')
3350
result._post_mortem(2)
3351
self.assertEqual([2], post_mortem_calls)
2963
3354
class TestRunSuite(tests.TestCase):
2976
3367
self.verbosity)
2977
3368
tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3369
self.assertLength(1, calls)
3372
class _Selftest(object):
3373
"""Mixin for tests needing full selftest output"""
3375
def _inject_stream_into_subunit(self, stream):
3376
"""To be overridden by subclasses that run tests out of process"""
3378
def _run_selftest(self, **kwargs):
3380
self._inject_stream_into_subunit(sio)
3381
tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3382
return sio.getvalue()
3385
class _ForkedSelftest(_Selftest):
3386
"""Mixin for tests needing full selftest output with forked children"""
3388
_test_needs_features = [features.subunit]
3390
def _inject_stream_into_subunit(self, stream):
3391
"""Monkey-patch subunit so the extra output goes to stream not stdout
3393
Some APIs need rewriting so this kind of bogus hackery can be replaced
3394
by passing the stream param from run_tests down into ProtocolTestCase.
3396
from subunit import ProtocolTestCase
3397
_original_init = ProtocolTestCase.__init__
3398
def _init_with_passthrough(self, *args, **kwargs):
3399
_original_init(self, *args, **kwargs)
3400
self._passthrough = stream
3401
self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3403
def _run_selftest(self, **kwargs):
3404
# GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3405
if getattr(os, "fork", None) is None:
3406
raise tests.TestNotApplicable("Platform doesn't support forking")
3407
# Make sure the fork code is actually invoked by claiming two cores
3408
self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3409
kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3410
return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3413
class TestParallelFork(_ForkedSelftest, tests.TestCase):
3414
"""Check operation of --parallel=fork selftest option"""
3416
def test_error_in_child_during_fork(self):
3417
"""Error in a forked child during test setup should get reported"""
3418
class Test(tests.TestCase):
3419
def testMethod(self):
3421
# We don't care what, just break something that a child will run
3422
self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3423
out = self._run_selftest(test_suite_factory=Test)
3424
# Lines from the tracebacks of the two child processes may be mixed
3425
# together due to the way subunit parses and forwards the streams,
3426
# so permit extra lines between each part of the error output.
3427
self.assertContainsRe(out,
3430
".+ in fork_for_tests\n"
3432
"\s*workaround_zealous_crypto_random\(\)\n"
3437
class TestUncollectedWarnings(_Selftest, tests.TestCase):
3438
"""Check a test case still alive after being run emits a warning"""
3440
class Test(tests.TestCase):
3441
def test_pass(self):
3443
def test_self_ref(self):
3444
self.also_self = self.test_self_ref
3445
def test_skip(self):
3446
self.skip("Don't need")
3448
def _get_suite(self):
3449
return TestUtil.TestSuite([
3450
self.Test("test_pass"),
3451
self.Test("test_self_ref"),
3452
self.Test("test_skip"),
3455
def _run_selftest_with_suite(self, **kwargs):
3456
old_flags = tests.selftest_debug_flags
3457
tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3458
gc_on = gc.isenabled()
3462
output = self._run_selftest(test_suite_factory=self._get_suite,
3467
tests.selftest_debug_flags = old_flags
3468
self.assertNotContainsRe(output, "Uncollected test case.*test_pass")
3469
self.assertContainsRe(output, "Uncollected test case.*test_self_ref")
3472
def test_testsuite(self):
3473
self._run_selftest_with_suite()
3475
def test_pattern(self):
3476
out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3477
self.assertNotContainsRe(out, "test_skip")
3479
def test_exclude_pattern(self):
3480
out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3481
self.assertNotContainsRe(out, "test_skip")
3483
def test_random_seed(self):
3484
self._run_selftest_with_suite(random_seed="now")
3486
def test_matching_tests_first(self):
3487
self._run_selftest_with_suite(matching_tests_first=True,
3488
pattern="test_self_ref$")
3490
def test_starting_with_and_exclude(self):
3491
out = self._run_selftest_with_suite(starting_with=["bt."],
3492
exclude_pattern="test_skip$")
3493
self.assertNotContainsRe(out, "test_skip")
3495
def test_additonal_decorator(self):
3496
out = self._run_selftest_with_suite(
3497
suite_decorators=[tests.TestDecorator])
3500
class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3501
"""Check warnings from tests staying alive are emitted with subunit"""
3503
_test_needs_features = [features.subunit]
3505
def _run_selftest_with_suite(self, **kwargs):
3506
return TestUncollectedWarnings._run_selftest_with_suite(self,
3507
runner_class=tests.SubUnitBzrRunner, **kwargs)
3510
class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3511
"""Check warnings from tests staying alive are emitted when forking"""
3514
class TestEnvironHandling(tests.TestCase):
3516
def test_overrideEnv_None_called_twice_doesnt_leak(self):
3517
self.assertFalse('MYVAR' in os.environ)
3518
self.overrideEnv('MYVAR', '42')
3519
# We use an embedded test to make sure we fix the _captureVar bug
3520
class Test(tests.TestCase):
3522
# The first call save the 42 value
3523
self.overrideEnv('MYVAR', None)
3524
self.assertEqual(None, os.environ.get('MYVAR'))
3525
# Make sure we can call it twice
3526
self.overrideEnv('MYVAR', None)
3527
self.assertEqual(None, os.environ.get('MYVAR'))
3529
result = tests.TextTestResult(output, 0, 1)
3530
Test('test_me').run(result)
3531
if not result.wasStrictlySuccessful():
3532
self.fail(output.getvalue())
3533
# We get our value back
3534
self.assertEqual('42', os.environ.get('MYVAR'))
3537
class TestIsolatedEnv(tests.TestCase):
3538
"""Test isolating tests from os.environ.
3540
Since we use tests that are already isolated from os.environ a bit of care
3541
should be taken when designing the tests to avoid bootstrap side-effects.
3542
The tests start an already clean os.environ which allow doing valid
3543
assertions about which variables are present or not and design tests around
3547
class ScratchMonkey(tests.TestCase):
3552
def test_basics(self):
3553
# Make sure we know the definition of BZR_HOME: not part of os.environ
3554
# for tests.TestCase.
3555
self.assertTrue('BZR_HOME' in tests.isolated_environ)
3556
self.assertEqual(None, tests.isolated_environ['BZR_HOME'])
3557
# Being part of isolated_environ, BZR_HOME should not appear here
3558
self.assertFalse('BZR_HOME' in os.environ)
3559
# Make sure we know the definition of LINES: part of os.environ for
3561
self.assertTrue('LINES' in tests.isolated_environ)
3562
self.assertEqual('25', tests.isolated_environ['LINES'])
3563
self.assertEqual('25', os.environ['LINES'])
3565
def test_injecting_unknown_variable(self):
3566
# BZR_HOME is known to be absent from os.environ
3567
test = self.ScratchMonkey('test_me')
3568
tests.override_os_environ(test, {'BZR_HOME': 'foo'})
3569
self.assertEqual('foo', os.environ['BZR_HOME'])
3570
tests.restore_os_environ(test)
3571
self.assertFalse('BZR_HOME' in os.environ)
3573
def test_injecting_known_variable(self):
3574
test = self.ScratchMonkey('test_me')
3575
# LINES is known to be present in os.environ
3576
tests.override_os_environ(test, {'LINES': '42'})
3577
self.assertEqual('42', os.environ['LINES'])
3578
tests.restore_os_environ(test)
3579
self.assertEqual('25', os.environ['LINES'])
3581
def test_deleting_variable(self):
3582
test = self.ScratchMonkey('test_me')
3583
# LINES is known to be present in os.environ
3584
tests.override_os_environ(test, {'LINES': None})
3585
self.assertTrue('LINES' not in os.environ)
3586
tests.restore_os_environ(test)
3587
self.assertEqual('25', os.environ['LINES'])
3590
class TestDocTestSuiteIsolation(tests.TestCase):
3591
"""Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3593
Since tests.TestCase alreay provides an isolation from os.environ, we use
3594
the clean environment as a base for testing. To precisely capture the
3595
isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3598
We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3599
not `os.environ` so each test overrides it to suit its needs.
3603
def get_doctest_suite_for_string(self, klass, string):
3604
class Finder(doctest.DocTestFinder):
3606
def find(*args, **kwargs):
3607
test = doctest.DocTestParser().get_doctest(
3608
string, {}, 'foo', 'foo.py', 0)
3611
suite = klass(test_finder=Finder())
3614
def run_doctest_suite_for_string(self, klass, string):
3615
suite = self.get_doctest_suite_for_string(klass, string)
3617
result = tests.TextTestResult(output, 0, 1)
3619
return result, output
3621
def assertDocTestStringSucceds(self, klass, string):
3622
result, output = self.run_doctest_suite_for_string(klass, string)
3623
if not result.wasStrictlySuccessful():
3624
self.fail(output.getvalue())
3626
def assertDocTestStringFails(self, klass, string):
3627
result, output = self.run_doctest_suite_for_string(klass, string)
3628
if result.wasStrictlySuccessful():
3629
self.fail(output.getvalue())
3631
def test_injected_variable(self):
3632
self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3635
>>> os.environ['LINES']
3638
# doctest.DocTestSuite fails as it sees '25'
3639
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3640
# tests.DocTestSuite sees '42'
3641
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3643
def test_deleted_variable(self):
3644
self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3647
>>> os.environ.get('LINES')
3649
# doctest.DocTestSuite fails as it sees '25'
3650
self.assertDocTestStringFails(doctest.DocTestSuite, test)
3651
# tests.DocTestSuite sees None
3652
self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3655
class TestSelftestExcludePatterns(tests.TestCase):
3658
super(TestSelftestExcludePatterns, self).setUp()
3659
self.overrideAttr(tests, 'test_suite', self.suite_factory)
3661
def suite_factory(self, keep_only=None, starting_with=None):
3662
"""A test suite factory with only a few tests."""
3663
class Test(tests.TestCase):
3665
# We don't need the full class path
3666
return self._testMethodName
3673
return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3675
def assertTestList(self, expected, *selftest_args):
3676
# We rely on setUp installing the right test suite factory so we can
3677
# test at the command level without loading the whole test suite
3678
out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3679
actual = out.splitlines()
3680
self.assertEqual(expected, actual)
3682
def test_full_list(self):
3683
self.assertTestList(['a', 'b', 'c'])
3685
def test_single_exclude(self):
3686
self.assertTestList(['b', 'c'], '-x', 'a')
3688
def test_mutiple_excludes(self):
3689
self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3692
class TestCounterHooks(tests.TestCase, SelfTestHelper):
3694
_test_needs_features = [features.subunit]
3697
super(TestCounterHooks, self).setUp()
3698
class Test(tests.TestCase):
3701
super(Test, self).setUp()
3702
self.hooks = hooks.Hooks()
3703
self.hooks.add_hook('myhook', 'Foo bar blah', (2,4))
3704
self.install_counter_hook(self.hooks, 'myhook')
3709
def run_hook_once(self):
3710
for hook in self.hooks['myhook']:
3713
self.test_class = Test
3715
def assertHookCalls(self, expected_calls, test_name):
3716
test = self.test_class(test_name)
3717
result = unittest.TestResult()
3719
self.assertTrue(hasattr(test, '_counters'))
3720
self.assertTrue(test._counters.has_key('myhook'))
3721
self.assertEqual(expected_calls, test._counters['myhook'])
3723
def test_no_hook(self):
3724
self.assertHookCalls(0, 'no_hook')
3726
def test_run_hook_once(self):
3727
tt = features.testtools
3728
if tt.module.__version__ < (0, 9, 8):
3729
raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3730
self.assertHookCalls(1, 'run_hook_once')