312
326
from bzrlib.tests.per_interrepository import make_scenarios
315
formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
329
formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
scenarios = make_scenarios(server1, server2, formats)
317
331
self.assertEqual([
319
333
{'repository_format': 'C1',
320
334
'repository_format_to': 'C2',
321
335
'transport_readonly_server': 'b',
322
'transport_server': 'a'}),
336
'transport_server': 'a',
337
'extra_setup': 'C3'}),
324
339
{'repository_format': 'D1',
325
340
'repository_format_to': 'D2',
326
341
'transport_readonly_server': 'b',
327
'transport_server': 'a'})],
342
'transport_server': 'a',
343
'extra_setup': 'D3'})],
610
626
test = TestDanglingLock('test_function')
611
627
result = test.run()
628
total_failures = result.errors + result.failures
612
629
if self._lock_check_thorough:
613
self.assertEqual(1, len(result.errors))
630
self.assertLength(1, total_failures)
615
632
# When _lock_check_thorough is disabled, then we don't trigger a
617
self.assertEqual(0, len(result.errors))
634
self.assertLength(0, total_failures)
620
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
638
"""Tests for the convenience functions TestCaseWithTransport introduces."""
623
640
def test_get_readonly_url_none(self):
624
from bzrlib.transport import get_transport
625
641
from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
642
self.vfs_transport_factory = memory.MemoryServer
627
643
self.transport_readonly_server = None
630
646
url = self.get_readonly_url()
631
647
url2 = self.get_readonly_url('foo/bar')
632
t = get_transport(url)
633
t2 = get_transport(url2)
648
t = transport.get_transport(url)
649
t2 = transport.get_transport(url2)
634
650
self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
651
self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
652
self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
638
654
def test_get_readonly_url_http(self):
639
655
from bzrlib.tests.http_server import HttpServer
640
from bzrlib.transport import get_transport
641
656
from bzrlib.transport.http import HttpTransportBase
642
657
self.transport_server = test_server.LocalURLServer
643
658
self.transport_readonly_server = HttpServer
839
853
"""A KnownFailure being raised should trigger several result actions."""
840
854
class InstrumentedTestResult(tests.ExtendedTestResult):
841
855
def stopTestRun(self): pass
842
def startTests(self): pass
843
def report_test_start(self, test): pass
856
def report_tests_starting(self): pass
844
857
def report_known_failure(self, test, err=None, details=None):
845
858
self._call = test, 'known failure'
846
859
result = InstrumentedTestResult(None, None, None, None)
893
909
"""Test the behaviour of invoking addNotSupported."""
894
910
class InstrumentedTestResult(tests.ExtendedTestResult):
895
911
def stopTestRun(self): pass
896
def startTests(self): pass
897
def report_test_start(self, test): pass
912
def report_tests_starting(self): pass
898
913
def report_unsupported(self, test, feature):
899
914
self._call = test, feature
900
915
result = InstrumentedTestResult(None, None, None, None)
939
954
"""An UnavailableFeature being raised should invoke addNotSupported."""
940
955
class InstrumentedTestResult(tests.ExtendedTestResult):
941
956
def stopTestRun(self): pass
942
def startTests(self): pass
943
def report_test_start(self, test): pass
957
def report_tests_starting(self): pass
944
958
def addNotSupported(self, test, feature):
945
959
self._call = test, feature
946
960
result = InstrumentedTestResult(None, None, None, None)
996
1009
test.run(result)
997
1010
self.assertEquals(1, result.calls)
1012
def test_startTests_only_once(self):
1013
"""With multiple tests startTests should still only be called once"""
1014
class InstrumentedTestResult(tests.ExtendedTestResult):
1016
def startTests(self): self.calls += 1
1017
result = InstrumentedTestResult(None, None, None, None)
1018
suite = unittest.TestSuite([
1019
unittest.FunctionTestCase(lambda: None),
1020
unittest.FunctionTestCase(lambda: None)])
1022
self.assertEquals(1, result.calls)
1023
self.assertEquals(2, result.count)
1000
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1213
1236
self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1237
self.assertLength(1, self._get_source_tree_calls)
1239
def test_verbose_test_count(self):
1240
"""A verbose test run reports the right test count at the start"""
1241
suite = TestUtil.TestSuite([
1242
unittest.FunctionTestCase(lambda:None),
1243
unittest.FunctionTestCase(lambda:None)])
1244
self.assertEqual(suite.countTestCases(), 2)
1246
runner = tests.TextTestRunner(stream=stream, verbosity=2)
1247
# Need to use the CountingDecorator as that's what sets num_tests
1248
result = self.run_test_runner(runner, tests.CountingDecorator(suite))
1249
self.assertStartsWith(stream.getvalue(), "running 2 tests")
1216
1251
def test_startTestRun(self):
1217
1252
"""run should call result.startTestRun()"""
1655
1690
self.assertEqual('original', obj.test_attr)
1693
class TestTestCloning(tests.TestCase):
1694
"""Tests that test cloning of TestCases (as used by multiply_tests)."""
1696
def test_cloned_testcase_does_not_share_details(self):
1697
"""A TestCase cloned with clone_test does not share mutable attributes
1698
such as details or cleanups.
1700
class Test(tests.TestCase):
1702
self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1703
orig_test = Test('test_foo')
1704
cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1705
orig_test.run(unittest.TestResult())
1706
self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1707
self.assertEqual(None, cloned_test.getDetails().get('foo'))
1709
def test_double_apply_scenario_preserves_first_scenario(self):
1710
"""Applying two levels of scenarios to a test preserves the attributes
1711
added by both scenarios.
1713
class Test(tests.TestCase):
1716
test = Test('test_foo')
1717
scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1718
scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1719
suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1720
suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1721
all_tests = list(tests.iter_suite_tests(suite))
1722
self.assertLength(4, all_tests)
1723
all_xys = sorted((t.x, t.y) for t in all_tests)
1724
self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1658
1727
# NB: Don't delete this; it's not actually from 0.11!
1659
1728
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1729
def sample_deprecated_function():
2960
3034
self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3037
class TestThreadLeakDetection(tests.TestCase):
3038
"""Ensure when tests leak threads we detect and report it"""
3040
class LeakRecordingResult(tests.ExtendedTestResult):
3042
tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3044
def _report_thread_leak(self, test, leaks, alive):
3045
self.leaks.append((test, leaks))
3047
def test_testcase_without_addCleanups(self):
3048
"""Check old TestCase instances don't break with leak detection"""
3049
class Test(unittest.TestCase):
3052
addCleanup = None # for when on Python 2.7 with native addCleanup
3053
result = self.LeakRecordingResult()
3055
self.assertIs(getattr(test, "addCleanup", None), None)
3056
result.startTestRun()
3058
result.stopTestRun()
3059
self.assertEqual(result._tests_leaking_threads_count, 0)
3060
self.assertEqual(result.leaks, [])
3062
def test_thread_leak(self):
3063
"""Ensure a thread that outlives the running of a test is reported
3065
Uses a thread that blocks on an event, and is started by the inner
3066
test case. As the thread outlives the inner case's run, it should be
3067
detected as a leak, but the event is then set so that the thread can
3068
be safely joined in cleanup so it's not leaked for real.
3070
event = threading.Event()
3071
thread = threading.Thread(name="Leaker", target=event.wait)
3072
class Test(tests.TestCase):
3073
def test_leak(self):
3075
result = self.LeakRecordingResult()
3076
test = Test("test_leak")
3077
self.addCleanup(thread.join)
3078
self.addCleanup(event.set)
3079
result.startTestRun()
3081
result.stopTestRun()
3082
self.assertEqual(result._tests_leaking_threads_count, 1)
3083
self.assertEqual(result._first_thread_leaker_id, test.id())
3084
self.assertEqual(result.leaks, [(test, set([thread]))])
3085
self.assertContainsString(result.stream.getvalue(), "leaking threads")
3087
def test_multiple_leaks(self):
3088
"""Check multiple leaks are blamed on the test cases at fault
3090
Same concept as the previous test, but has one inner test method that
3091
leaks two threads, and one that doesn't leak at all.
3093
event = threading.Event()
3094
thread_a = threading.Thread(name="LeakerA", target=event.wait)
3095
thread_b = threading.Thread(name="LeakerB", target=event.wait)
3096
thread_c = threading.Thread(name="LeakerC", target=event.wait)
3097
class Test(tests.TestCase):
3098
def test_first_leak(self):
3100
def test_second_no_leak(self):
3102
def test_third_leak(self):
3105
result = self.LeakRecordingResult()
3106
first_test = Test("test_first_leak")
3107
third_test = Test("test_third_leak")
3108
self.addCleanup(thread_a.join)
3109
self.addCleanup(thread_b.join)
3110
self.addCleanup(thread_c.join)
3111
self.addCleanup(event.set)
3112
result.startTestRun()
3114
[first_test, Test("test_second_no_leak"), third_test]
3116
result.stopTestRun()
3117
self.assertEqual(result._tests_leaking_threads_count, 2)
3118
self.assertEqual(result._first_thread_leaker_id, first_test.id())
3119
self.assertEqual(result.leaks, [
3120
(first_test, set([thread_b])),
3121
(third_test, set([thread_a, thread_c]))])
3122
self.assertContainsString(result.stream.getvalue(), "leaking threads")
2963
3125
class TestRunSuite(tests.TestCase):
2965
3127
def test_runner_class(self):