312
326
from bzrlib.tests.per_interrepository import make_scenarios
315
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
329
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
scenarios = make_scenarios(server1, server2, formats)
317
331
self.assertEqual([
319
333
{'repository_format': 'C1',
320
334
'repository_format_to': 'C2',
321
335
'transport_readonly_server': 'b',
322
'transport_server': 'a'}),
336
'transport_server': 'a',
337
'extra_setup': 'C3'}),
324
339
{'repository_format': 'D1',
325
340
'repository_format_to': 'D2',
326
341
'transport_readonly_server': 'b',
327
'transport_server': 'a'})],
342
'transport_server': 'a',
343
'extra_setup': 'D3'})],
837
853
"""A KnownFailure being raised should trigger several result actions."""
838
854
class InstrumentedTestResult(tests.ExtendedTestResult):
839
855
def stopTestRun(self): pass
840
def startTests(self): pass
841
def report_test_start(self, test): pass
856
def report_tests_starting(self): pass
842
857
def report_known_failure(self, test, err=None, details=None):
843
858
self._call = test, 'known failure'
844
859
result = InstrumentedTestResult(None, None, None, None)
894
909
"""Test the behaviour of invoking addNotSupported."""
895
910
class InstrumentedTestResult(tests.ExtendedTestResult):
896
911
def stopTestRun(self): pass
897
def startTests(self): pass
898
def report_test_start(self, test): pass
912
def report_tests_starting(self): pass
899
913
def report_unsupported(self, test, feature):
900
914
self._call = test, feature
901
915
result = InstrumentedTestResult(None, None, None, None)
940
954
"""An UnavailableFeature being raised should invoke addNotSupported."""
941
955
class InstrumentedTestResult(tests.ExtendedTestResult):
942
956
def stopTestRun(self): pass
943
def startTests(self): pass
944
def report_test_start(self, test): pass
957
def report_tests_starting(self): pass
945
958
def addNotSupported(self, test, feature):
946
959
self._call = test, feature
947
960
result = InstrumentedTestResult(None, None, None, None)
997
1009
test.run(result)
998
1010
self.assertEquals(1, result.calls)
1012
def test_startTests_only_once(self):
1013
"""With multiple tests startTests should still only be called once"""
1014
class InstrumentedTestResult(tests.ExtendedTestResult):
1016
def startTests(self): self.calls += 1
1017
result = InstrumentedTestResult(None, None, None, None)
1018
suite = unittest.TestSuite([
1019
unittest.FunctionTestCase(lambda: None),
1020
unittest.FunctionTestCase(lambda: None)])
1022
self.assertEquals(1, result.calls)
1023
self.assertEquals(2, result.count)
1001
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1214
1236
self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1237
self.assertLength(1, self._get_source_tree_calls)
1239
def test_verbose_test_count(self):
1240
"""A verbose test run reports the right test count at the start"""
1241
suite = TestUtil.TestSuite([
1242
unittest.FunctionTestCase(lambda:None),
1243
unittest.FunctionTestCase(lambda:None)])
1244
self.assertEqual(suite.countTestCases(), 2)
1246
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
# Need to use the CountingDecorator as that's what sets num_tests
1248
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1217
1251
def test_startTestRun(self):
1218
1252
"""run should call result.startTestRun()"""
1758
1792
self.assertTrue('log' in details)
1795
class TestTestCloning(tests.TestCase):
1796
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1798
def test_cloned_testcase_does_not_share_details(self):
1799
"""A TestCase cloned with clone_test does not share mutable attributes
1800
such as details or cleanups.
1802
class Test(tests.TestCase):
1804
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1805
orig_test = Test('test_foo')
1806
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1807
orig_test.run(unittest.TestResult())
1808
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1809
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1811
def test_double_apply_scenario_preserves_first_scenario(self):
1812
"""Applying two levels of scenarios to a test preserves the attributes
1813
added by both scenarios.
1815
class Test(tests.TestCase):
1818
test = Test('test_foo')
1819
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1820
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1821
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1822
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1823
all_tests = list(tests.iter_suite_tests(suite))
1824
self.assertLength(4, all_tests)
1825
all_xys = sorted((t.x, t.y) for t in all_tests)
1826
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1761
1829
# NB: Don't delete this; it's not actually from 0.11!
1762
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1763
1831
def sample_deprecated_function():
3145
3213
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3216
class TestThreadLeakDetection(tests.TestCase):
3217
"""Ensure when tests leak threads we detect and report it"""
3219
class LeakRecordingResult(tests.ExtendedTestResult):
3221
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3223
def _report_thread_leak(self, test, leaks, alive):
3224
self.leaks.append((test, leaks))
3226
def test_testcase_without_addCleanups(self):
3227
"""Check old TestCase instances don't break with leak detection"""
3228
class Test(unittest.TestCase):
3231
addCleanup = None # for when on Python 2.7 with native addCleanup
3232
result = self.LeakRecordingResult()
3234
self.assertIs(getattr(test, "addCleanup", None), None)
3235
result.startTestRun()
3237
result.stopTestRun()
3238
self.assertEqual(result._tests_leaking_threads_count, 0)
3239
self.assertEqual(result.leaks, [])
3241
def test_thread_leak(self):
3242
"""Ensure a thread that outlives the running of a test is reported
3244
Uses a thread that blocks on an event, and is started by the inner
3245
test case. As the thread outlives the inner case's run, it should be
3246
detected as a leak, but the event is then set so that the thread can
3247
be safely joined in cleanup so it's not leaked for real.
3249
event = threading.Event()
3250
thread = threading.Thread(name="Leaker", target=event.wait)
3251
class Test(tests.TestCase):
3252
def test_leak(self):
3254
result = self.LeakRecordingResult()
3255
test = Test("test_leak")
3256
self.addCleanup(thread.join)
3257
self.addCleanup(event.set)
3258
result.startTestRun()
3260
result.stopTestRun()
3261
self.assertEqual(result._tests_leaking_threads_count, 1)
3262
self.assertEqual(result._first_thread_leaker_id, test.id())
3263
self.assertEqual(result.leaks, [(test, set([thread]))])
3264
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3266
def test_multiple_leaks(self):
3267
"""Check multiple leaks are blamed on the test cases at fault
3269
Same concept as the previous test, but has one inner test method that
3270
leaks two threads, and one that doesn't leak at all.
3272
event = threading.Event()
3273
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3274
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3275
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3276
class Test(tests.TestCase):
3277
def test_first_leak(self):
3279
def test_second_no_leak(self):
3281
def test_third_leak(self):
3284
result = self.LeakRecordingResult()
3285
first_test = Test("test_first_leak")
3286
third_test = Test("test_third_leak")
3287
self.addCleanup(thread_a.join)
3288
self.addCleanup(thread_b.join)
3289
self.addCleanup(thread_c.join)
3290
self.addCleanup(event.set)
3291
result.startTestRun()
3293
[first_test, Test("test_second_no_leak"), third_test]
3295
result.stopTestRun()
3296
self.assertEqual(result._tests_leaking_threads_count, 2)
3297
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3298
self.assertEqual(result.leaks, [
3299
(first_test, set([thread_b])),
3300
(third_test, set([thread_a, thread_c]))])
3301
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3148
3304
class TestRunSuite(tests.TestCase):
3150
3306
def test_runner_class(self):