312
326
from bzrlib.tests.per_interrepository import make_scenarios
315
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
329
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
scenarios = make_scenarios(server1, server2, formats)
317
331
self.assertEqual([
319
333
{'repository_format': 'C1',
320
334
'repository_format_to': 'C2',
321
335
'transport_readonly_server': 'b',
322
'transport_server': 'a'}),
336
'transport_server': 'a',
337
'extra_setup': 'C3'}),
324
339
{'repository_format': 'D1',
325
340
'repository_format_to': 'D2',
326
341
'transport_readonly_server': 'b',
327
'transport_server': 'a'})],
342
'transport_server': 'a',
343
'extra_setup': 'D3'})],
835
849
self.assertContainsRe(output,
836
850
r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
852
def test_uses_time_from_testtools(self):
853
"""Test case timings in verbose results should use testtools times"""
855
class TimeAddedVerboseTestResult(tests.VerboseTestResult):
856
def startTest(self, test):
857
self.time(datetime.datetime.utcfromtimestamp(1.145))
858
super(TimeAddedVerboseTestResult, self).startTest(test)
859
def addSuccess(self, test):
860
self.time(datetime.datetime.utcfromtimestamp(51.147))
861
super(TimeAddedVerboseTestResult, self).addSuccess(test)
862
def report_tests_starting(self): pass
864
self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
865
self.assertEndsWith(sio.getvalue(), "OK 50002ms\n")
838
867
def test_known_failure(self):
839
868
"""A KnownFailure being raised should trigger several result actions."""
840
869
class InstrumentedTestResult(tests.ExtendedTestResult):
841
870
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
871
def report_tests_starting(self): pass
844
872
def report_known_failure(self, test, err=None, details=None):
845
873
self._call = test, 'known failure'
846
874
result = InstrumentedTestResult(None, None, None, None)
1655
1705
self.assertEqual('original', obj.test_attr)
1708
class _MissingFeature(tests.Feature):
1711
missing_feature = _MissingFeature()
1714
def _get_test(name):
1715
"""Get an instance of a specific example test.
1717
We protect this in a function so that they don't auto-run in the test
1721
class ExampleTests(tests.TestCase):
1723
def test_fail(self):
1724
mutter('this was a failing test')
1725
self.fail('this test will fail')
1727
def test_error(self):
1728
mutter('this test errored')
1729
raise RuntimeError('gotcha')
1731
def test_missing_feature(self):
1732
mutter('missing the feature')
1733
self.requireFeature(missing_feature)
1735
def test_skip(self):
1736
mutter('this test will be skipped')
1737
raise tests.TestSkipped('reason')
1739
def test_success(self):
1740
mutter('this test succeeds')
1742
def test_xfail(self):
1743
mutter('test with expected failure')
1744
self.knownFailure('this_fails')
1746
def test_unexpected_success(self):
1747
mutter('test with unexpected success')
1748
self.expectFailure('should_fail', lambda: None)
1750
return ExampleTests(name)
1753
class TestTestCaseLogDetails(tests.TestCase):
1755
def _run_test(self, test_name):
1756
test = _get_test(test_name)
1757
result = testtools.TestResult()
1761
def test_fail_has_log(self):
1762
result = self._run_test('test_fail')
1763
self.assertEqual(1, len(result.failures))
1764
result_content = result.failures[0][1]
1765
self.assertContainsRe(result_content, 'Text attachment: log')
1766
self.assertContainsRe(result_content, 'this was a failing test')
1768
def test_error_has_log(self):
1769
result = self._run_test('test_error')
1770
self.assertEqual(1, len(result.errors))
1771
result_content = result.errors[0][1]
1772
self.assertContainsRe(result_content, 'Text attachment: log')
1773
self.assertContainsRe(result_content, 'this test errored')
1775
def test_skip_has_no_log(self):
1776
result = self._run_test('test_skip')
1777
self.assertEqual(['reason'], result.skip_reasons.keys())
1778
skips = result.skip_reasons['reason']
1779
self.assertEqual(1, len(skips))
1781
self.assertFalse('log' in test.getDetails())
1783
def test_missing_feature_has_no_log(self):
1784
# testtools doesn't know about addNotSupported, so it just gets
1785
# considered as a skip
1786
result = self._run_test('test_missing_feature')
1787
self.assertEqual([missing_feature], result.skip_reasons.keys())
1788
skips = result.skip_reasons[missing_feature]
1789
self.assertEqual(1, len(skips))
1791
self.assertFalse('log' in test.getDetails())
1793
def test_xfail_has_no_log(self):
1794
result = self._run_test('test_xfail')
1795
self.assertEqual(1, len(result.expectedFailures))
1796
result_content = result.expectedFailures[0][1]
1797
self.assertNotContainsRe(result_content, 'Text attachment: log')
1798
self.assertNotContainsRe(result_content, 'test with expected failure')
1800
def test_unexpected_success_has_log(self):
1801
result = self._run_test('test_unexpected_success')
1802
self.assertEqual(1, len(result.unexpectedSuccesses))
1803
# Inconsistency, unexpectedSuccesses is a list of tests,
1804
# expectedFailures is a list of reasons?
1805
test = result.unexpectedSuccesses[0]
1806
details = test.getDetails()
1807
self.assertTrue('log' in details)
1810
class TestTestCloning(tests.TestCase):
1811
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1813
def test_cloned_testcase_does_not_share_details(self):
1814
"""A TestCase cloned with clone_test does not share mutable attributes
1815
such as details or cleanups.
1817
class Test(tests.TestCase):
1819
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1820
orig_test = Test('test_foo')
1821
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1822
orig_test.run(unittest.TestResult())
1823
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1824
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1826
def test_double_apply_scenario_preserves_first_scenario(self):
1827
"""Applying two levels of scenarios to a test preserves the attributes
1828
added by both scenarios.
1830
class Test(tests.TestCase):
1833
test = Test('test_foo')
1834
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1835
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1836
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1837
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1838
all_tests = list(tests.iter_suite_tests(suite))
1839
self.assertLength(4, all_tests)
1840
all_xys = sorted((t.x, t.y) for t in all_tests)
1841
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1844
# NB: Don't delete this; it's not actually from 0.11!
1659
1845
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1846
def sample_deprecated_function():
1971
2157
load_list='missing file name', list_only=True)
2160
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2162
_test_needs_features = [features.subunit]
2164
def run_subunit_stream(self, test_name):
2165
from subunit import ProtocolTestCase
2167
return TestUtil.TestSuite([_get_test(test_name)])
2168
stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
2169
test_suite_factory=factory)
2170
test = ProtocolTestCase(stream)
2171
result = testtools.TestResult()
2173
content = stream.getvalue()
2174
return content, result
2176
def test_fail_has_log(self):
2177
content, result = self.run_subunit_stream('test_fail')
2178
self.assertEqual(1, len(result.failures))
2179
self.assertContainsRe(content, '(?m)^log$')
2180
self.assertContainsRe(content, 'this test will fail')
2182
def test_error_has_log(self):
2183
content, result = self.run_subunit_stream('test_error')
2184
self.assertContainsRe(content, '(?m)^log$')
2185
self.assertContainsRe(content, 'this test errored')
2187
def test_skip_has_no_log(self):
2188
content, result = self.run_subunit_stream('test_skip')
2189
self.assertNotContainsRe(content, '(?m)^log$')
2190
self.assertNotContainsRe(content, 'this test will be skipped')
2191
self.assertEqual(['reason'], result.skip_reasons.keys())
2192
skips = result.skip_reasons['reason']
2193
self.assertEqual(1, len(skips))
2195
# RemotedTestCase doesn't preserve the "details"
2196
## self.assertFalse('log' in test.getDetails())
2198
def test_missing_feature_has_no_log(self):
2199
content, result = self.run_subunit_stream('test_missing_feature')
2200
self.assertNotContainsRe(content, '(?m)^log$')
2201
self.assertNotContainsRe(content, 'missing the feature')
2202
self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
2203
skips = result.skip_reasons['_MissingFeature\n']
2204
self.assertEqual(1, len(skips))
2206
# RemotedTestCase doesn't preserve the "details"
2207
## self.assertFalse('log' in test.getDetails())
2209
def test_xfail_has_no_log(self):
2210
content, result = self.run_subunit_stream('test_xfail')
2211
self.assertNotContainsRe(content, '(?m)^log$')
2212
self.assertNotContainsRe(content, 'test with expected failure')
2213
self.assertEqual(1, len(result.expectedFailures))
2214
result_content = result.expectedFailures[0][1]
2215
self.assertNotContainsRe(result_content, 'Text attachment: log')
2216
self.assertNotContainsRe(result_content, 'test with expected failure')
2218
def test_unexpected_success_has_log(self):
2219
content, result = self.run_subunit_stream('test_unexpected_success')
2220
self.assertContainsRe(content, '(?m)^log$')
2221
self.assertContainsRe(content, 'test with unexpected success')
2222
self.expectFailure('subunit treats "unexpectedSuccess"'
2223
' as a plain success',
2224
self.assertEqual, 1, len(result.unexpectedSuccesses))
2225
self.assertEqual(1, len(result.unexpectedSuccesses))
2226
test = result.unexpectedSuccesses[0]
2227
# RemotedTestCase doesn't preserve the "details"
2228
## self.assertTrue('log' in test.getDetails())
2230
def test_success_has_no_log(self):
2231
content, result = self.run_subunit_stream('test_success')
2232
self.assertEqual(1, result.testsRun)
2233
self.assertNotContainsRe(content, '(?m)^log$')
2234
self.assertNotContainsRe(content, 'this test succeeds')
1974
2237
class TestRunBzr(tests.TestCase):
2960
3228
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3231
class TestThreadLeakDetection(tests.TestCase):
3232
"""Ensure when tests leak threads we detect and report it"""
3234
class LeakRecordingResult(tests.ExtendedTestResult):
3236
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3238
def _report_thread_leak(self, test, leaks, alive):
3239
self.leaks.append((test, leaks))
3241
def test_testcase_without_addCleanups(self):
3242
"""Check old TestCase instances don't break with leak detection"""
3243
class Test(unittest.TestCase):
3246
addCleanup = None # for when on Python 2.7 with native addCleanup
3247
result = self.LeakRecordingResult()
3249
self.assertIs(getattr(test, "addCleanup", None), None)
3250
result.startTestRun()
3252
result.stopTestRun()
3253
self.assertEqual(result._tests_leaking_threads_count, 0)
3254
self.assertEqual(result.leaks, [])
3256
def test_thread_leak(self):
3257
"""Ensure a thread that outlives the running of a test is reported
3259
Uses a thread that blocks on an event, and is started by the inner
3260
test case. As the thread outlives the inner case's run, it should be
3261
detected as a leak, but the event is then set so that the thread can
3262
be safely joined in cleanup so it's not leaked for real.
3264
event = threading.Event()
3265
thread = threading.Thread(name="Leaker", target=event.wait)
3266
class Test(tests.TestCase):
3267
def test_leak(self):
3269
result = self.LeakRecordingResult()
3270
test = Test("test_leak")
3271
self.addCleanup(thread.join)
3272
self.addCleanup(event.set)
3273
result.startTestRun()
3275
result.stopTestRun()
3276
self.assertEqual(result._tests_leaking_threads_count, 1)
3277
self.assertEqual(result._first_thread_leaker_id, test.id())
3278
self.assertEqual(result.leaks, [(test, set([thread]))])
3279
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3281
def test_multiple_leaks(self):
3282
"""Check multiple leaks are blamed on the test cases at fault
3284
Same concept as the previous test, but has one inner test method that
3285
leaks two threads, and one that doesn't leak at all.
3287
event = threading.Event()
3288
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3289
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3290
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3291
class Test(tests.TestCase):
3292
def test_first_leak(self):
3294
def test_second_no_leak(self):
3296
def test_third_leak(self):
3299
result = self.LeakRecordingResult()
3300
first_test = Test("test_first_leak")
3301
third_test = Test("test_third_leak")
3302
self.addCleanup(thread_a.join)
3303
self.addCleanup(thread_b.join)
3304
self.addCleanup(thread_c.join)
3305
self.addCleanup(event.set)
3306
result.startTestRun()
3308
[first_test, Test("test_second_no_leak"), third_test]
3310
result.stopTestRun()
3311
self.assertEqual(result._tests_leaking_threads_count, 2)
3312
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3313
self.assertEqual(result.leaks, [
3314
(first_test, set([thread_b])),
3315
(third_test, set([thread_a, thread_c]))])
3316
self.assertContainsString(result.stream.getvalue(), "leaking threads")
2963
3319
class TestRunSuite(tests.TestCase):
2965
3321
def test_runner_class(self):