/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: John Arbash Meinel
  • Date: 2010-09-25 20:08:01 UTC
  • mfrom: (5444 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5445.
  • Revision ID: john@arbash-meinel.com-20100925200801-7uf0ux3uwxo9i3x0
Merge bzr.dev 5444 to resolve some small text conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
29
31
from testtools.content_type import ContentType
30
32
from testtools.matchers import (
31
33
    DocTestMatches,
42
44
    lockdir,
43
45
    memorytree,
44
46
    osutils,
45
 
    progress,
46
47
    remote,
47
48
    repository,
48
49
    symbol_versioning,
122
123
        self.failUnlessExists(filename)
123
124
 
124
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
125
139
class TestTransportScenarios(tests.TestCase):
126
140
    """A group of tests that test the transport implementation adaption core.
127
141
 
208
222
    def test_scenarios(self):
209
223
        # check that constructor parameters are passed through to the adapted
210
224
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
212
226
        vfs_factory = "v"
213
227
        server1 = "a"
214
228
        server2 = "b"
312
326
        from bzrlib.tests.per_interrepository import make_scenarios
313
327
        server1 = "a"
314
328
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
330
        scenarios = make_scenarios(server1, server2, formats)
317
331
        self.assertEqual([
318
332
            ('C0,str,str',
319
333
             {'repository_format': 'C1',
320
334
              'repository_format_to': 'C2',
321
335
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
323
338
            ('D0,str,str',
324
339
             {'repository_format': 'D1',
325
340
              'repository_format_to': 'D2',
326
341
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
328
344
            scenarios)
329
345
 
330
346
 
837
853
        """A KnownFailure being raised should trigger several result actions."""
838
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
839
855
            def stopTestRun(self): pass
840
 
            def startTests(self): pass
841
 
            def report_test_start(self, test): pass
 
856
            def report_tests_starting(self): pass
842
857
            def report_known_failure(self, test, err=None, details=None):
843
858
                self._call = test, 'known failure'
844
859
        result = InstrumentedTestResult(None, None, None, None)
894
909
        """Test the behaviour of invoking addNotSupported."""
895
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
896
911
            def stopTestRun(self): pass
897
 
            def startTests(self): pass
898
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
899
913
            def report_unsupported(self, test, feature):
900
914
                self._call = test, feature
901
915
        result = InstrumentedTestResult(None, None, None, None)
940
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
941
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
942
956
            def stopTestRun(self): pass
943
 
            def startTests(self): pass
944
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
945
958
            def addNotSupported(self, test, feature):
946
959
                self._call = test, feature
947
960
        result = InstrumentedTestResult(None, None, None, None)
989
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
990
1003
            calls = 0
991
1004
            def startTests(self): self.calls += 1
992
 
            def report_test_start(self, test): pass
993
1005
        result = InstrumentedTestResult(None, None, None, None)
994
1006
        def test_function():
995
1007
            pass
997
1009
        test.run(result)
998
1010
        self.assertEquals(1, result.calls)
999
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
1000
1025
 
1001
1026
class TestUnicodeFilenameFeature(tests.TestCase):
1002
1027
 
1023
1048
        because of our use of global state.
1024
1049
        """
1025
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1026
 
        old_leak = tests.TestCase._first_thread_leaker_id
1027
1051
        try:
1028
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1029
 
            tests.TestCase._first_thread_leaker_id = None
1030
1053
            return testrunner.run(test)
1031
1054
        finally:
1032
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1033
 
            tests.TestCase._first_thread_leaker_id = old_leak
1034
1056
 
1035
1057
    def test_known_failure_failed_run(self):
1036
1058
        # run a test that generates a known failure which should be printed in
1214
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1215
1237
        self.assertLength(1, self._get_source_tree_calls)
1216
1238
 
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1250
 
1217
1251
    def test_startTestRun(self):
1218
1252
        """run should call result.startTestRun()"""
1219
1253
        calls = []
1758
1792
        self.assertTrue('log' in details)
1759
1793
 
1760
1794
 
 
1795
class TestTestCloning(tests.TestCase):
 
1796
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1797
 
 
1798
    def test_cloned_testcase_does_not_share_details(self):
 
1799
        """A TestCase cloned with clone_test does not share mutable attributes
 
1800
        such as details or cleanups.
 
1801
        """
 
1802
        class Test(tests.TestCase):
 
1803
            def test_foo(self):
 
1804
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1805
        orig_test = Test('test_foo')
 
1806
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1807
        orig_test.run(unittest.TestResult())
 
1808
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1809
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1810
 
 
1811
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1812
        """Applying two levels of scenarios to a test preserves the attributes
 
1813
        added by both scenarios.
 
1814
        """
 
1815
        class Test(tests.TestCase):
 
1816
            def test_foo(self):
 
1817
                pass
 
1818
        test = Test('test_foo')
 
1819
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1820
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1821
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1822
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1823
        all_tests = list(tests.iter_suite_tests(suite))
 
1824
        self.assertLength(4, all_tests)
 
1825
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1826
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1827
 
 
1828
 
1761
1829
# NB: Don't delete this; it's not actually from 0.11!
1762
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1763
1831
def sample_deprecated_function():
3145
3213
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3146
3214
 
3147
3215
 
 
3216
class TestThreadLeakDetection(tests.TestCase):
 
3217
    """Ensure when tests leak threads we detect and report it"""
 
3218
 
 
3219
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3220
        def __init__(self):
 
3221
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3222
            self.leaks = []
 
3223
        def _report_thread_leak(self, test, leaks, alive):
 
3224
            self.leaks.append((test, leaks))
 
3225
 
 
3226
    def test_testcase_without_addCleanups(self):
 
3227
        """Check old TestCase instances don't break with leak detection"""
 
3228
        class Test(unittest.TestCase):
 
3229
            def runTest(self):
 
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3232
        result = self.LeakRecordingResult()
 
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3235
        result.startTestRun()
 
3236
        test.run(result)
 
3237
        result.stopTestRun()
 
3238
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3239
        self.assertEqual(result.leaks, [])
 
3240
        
 
3241
    def test_thread_leak(self):
 
3242
        """Ensure a thread that outlives the running of a test is reported
 
3243
 
 
3244
        Uses a thread that blocks on an event, and is started by the inner
 
3245
        test case. As the thread outlives the inner case's run, it should be
 
3246
        detected as a leak, but the event is then set so that the thread can
 
3247
        be safely joined in cleanup so it's not leaked for real.
 
3248
        """
 
3249
        event = threading.Event()
 
3250
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3251
        class Test(tests.TestCase):
 
3252
            def test_leak(self):
 
3253
                thread.start()
 
3254
        result = self.LeakRecordingResult()
 
3255
        test = Test("test_leak")
 
3256
        self.addCleanup(thread.join)
 
3257
        self.addCleanup(event.set)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3262
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3263
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3264
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3265
 
 
3266
    def test_multiple_leaks(self):
 
3267
        """Check multiple leaks are blamed on the test cases at fault
 
3268
 
 
3269
        Same concept as the previous test, but has one inner test method that
 
3270
        leaks two threads, and one that doesn't leak at all.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3274
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3275
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3276
        class Test(tests.TestCase):
 
3277
            def test_first_leak(self):
 
3278
                thread_b.start()
 
3279
            def test_second_no_leak(self):
 
3280
                pass
 
3281
            def test_third_leak(self):
 
3282
                thread_c.start()
 
3283
                thread_a.start()
 
3284
        result = self.LeakRecordingResult()
 
3285
        first_test = Test("test_first_leak")
 
3286
        third_test = Test("test_third_leak")
 
3287
        self.addCleanup(thread_a.join)
 
3288
        self.addCleanup(thread_b.join)
 
3289
        self.addCleanup(thread_c.join)
 
3290
        self.addCleanup(event.set)
 
3291
        result.startTestRun()
 
3292
        unittest.TestSuite(
 
3293
            [first_test, Test("test_second_no_leak"), third_test]
 
3294
            ).run(result)
 
3295
        result.stopTestRun()
 
3296
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3297
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3298
        self.assertEqual(result.leaks, [
 
3299
            (first_test, set([thread_b])),
 
3300
            (third_test, set([thread_a, thread_c]))])
 
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3302
 
 
3303
 
3148
3304
class TestRunSuite(tests.TestCase):
3149
3305
 
3150
3306
    def test_runner_class(self):