/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Martin Pool
  • Date: 2010-10-08 04:38:25 UTC
  • mfrom: (5462 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5478.
  • Revision ID: mbp@sourcefrog.net-20101008043825-b181r8bo5r3qwb6j
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
 
20
from doctest import ELLIPSIS
20
21
import os
21
22
import signal
22
23
import sys
 
24
import threading
23
25
import time
24
26
import unittest
25
27
import warnings
26
28
 
 
29
from testtools import MultiTestResult
 
30
from testtools.content import Content
 
31
from testtools.content_type import ContentType
 
32
from testtools.matchers import (
 
33
    DocTestMatches,
 
34
    Equals,
 
35
    )
 
36
import testtools.tests.helpers
 
37
 
27
38
import bzrlib
28
39
from bzrlib import (
29
40
    branchbuilder,
33
44
    lockdir,
34
45
    memorytree,
35
46
    osutils,
36
 
    progress,
37
47
    remote,
38
48
    repository,
39
49
    symbol_versioning,
52
62
    deprecated_method,
53
63
    )
54
64
from bzrlib.tests import (
55
 
    SubUnitFeature,
 
65
    features,
56
66
    test_lsprof,
 
67
    test_server,
57
68
    test_sftp_transport,
58
69
    TestUtil,
59
70
    )
60
 
from bzrlib.trace import note
61
 
from bzrlib.transport.memory import MemoryServer, MemoryTransport
 
71
from bzrlib.trace import note, mutter
 
72
from bzrlib.transport import memory
62
73
from bzrlib.version import _get_bzr_source_tree
63
74
 
64
75
 
78
89
                          TestUtil._load_module_by_name,
79
90
                          'bzrlib.no-name-yet')
80
91
 
 
92
 
81
93
class MetaTestLog(tests.TestCase):
82
94
 
83
95
    def test_logging(self):
84
96
        """Test logs are captured when a test fails."""
85
97
        self.log('a test message')
86
 
        self._log_file.flush()
87
 
        self.assertContainsRe(self._get_log(keep_log_file=True),
88
 
                              'a test message\n')
 
98
        details = self.getDetails()
 
99
        log = details['log']
 
100
        self.assertThat(log.content_type, Equals(ContentType(
 
101
            "text", "plain", {"charset": "utf8"})))
 
102
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
 
103
        self.assertThat(self.get_log(),
 
104
            DocTestMatches(u"...a test message\n", ELLIPSIS))
89
105
 
90
106
 
91
107
class TestUnicodeFilename(tests.TestCase):
107
123
        self.failUnlessExists(filename)
108
124
 
109
125
 
 
126
class TestClassesAvailable(tests.TestCase):
 
127
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
128
 
 
129
    def test_test_case(self):
 
130
        from bzrlib.tests import TestCase
 
131
 
 
132
    def test_test_loader(self):
 
133
        from bzrlib.tests import TestLoader
 
134
 
 
135
    def test_test_suite(self):
 
136
        from bzrlib.tests import TestSuite
 
137
 
 
138
 
110
139
class TestTransportScenarios(tests.TestCase):
111
140
    """A group of tests that test the transport implementation adaption core.
112
141
 
193
222
    def test_scenarios(self):
194
223
        # check that constructor parameters are passed through to the adapted
195
224
        # test.
196
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
225
        from bzrlib.tests.per_controldir import make_scenarios
197
226
        vfs_factory = "v"
198
227
        server1 = "a"
199
228
        server2 = "b"
297
326
        from bzrlib.tests.per_interrepository import make_scenarios
298
327
        server1 = "a"
299
328
        server2 = "b"
300
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
329
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
301
330
        scenarios = make_scenarios(server1, server2, formats)
302
331
        self.assertEqual([
303
332
            ('C0,str,str',
304
333
             {'repository_format': 'C1',
305
334
              'repository_format_to': 'C2',
306
335
              'transport_readonly_server': 'b',
307
 
              'transport_server': 'a'}),
 
336
              'transport_server': 'a',
 
337
              'extra_setup': 'C3'}),
308
338
            ('D0,str,str',
309
339
             {'repository_format': 'D1',
310
340
              'repository_format_to': 'D2',
311
341
              'transport_readonly_server': 'b',
312
 
              'transport_server': 'a'})],
 
342
              'transport_server': 'a',
 
343
              'extra_setup': 'D3'})],
313
344
            scenarios)
314
345
 
315
346
 
594
625
                l.attempt_lock()
595
626
        test = TestDanglingLock('test_function')
596
627
        result = test.run()
 
628
        total_failures = result.errors + result.failures
597
629
        if self._lock_check_thorough:
598
 
            self.assertEqual(1, len(result.errors))
 
630
            self.assertEqual(1, len(total_failures))
599
631
        else:
600
632
            # When _lock_check_thorough is disabled, then we don't trigger a
601
633
            # failure
602
 
            self.assertEqual(0, len(result.errors))
 
634
            self.assertEqual(0, len(total_failures))
603
635
 
604
636
 
605
637
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
606
638
    """Tests for the convenience functions TestCaseWithTransport introduces."""
607
639
 
608
640
    def test_get_readonly_url_none(self):
609
 
        from bzrlib.transport import get_transport
610
 
        from bzrlib.transport.memory import MemoryServer
611
641
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
612
 
        self.vfs_transport_factory = MemoryServer
 
642
        self.vfs_transport_factory = memory.MemoryServer
613
643
        self.transport_readonly_server = None
614
644
        # calling get_readonly_transport() constructs a decorator on the url
615
645
        # for the server
616
646
        url = self.get_readonly_url()
617
647
        url2 = self.get_readonly_url('foo/bar')
618
 
        t = get_transport(url)
619
 
        t2 = get_transport(url2)
 
648
        t = transport.get_transport(url)
 
649
        t2 = transport.get_transport(url2)
620
650
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
621
651
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
622
652
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
623
653
 
624
654
    def test_get_readonly_url_http(self):
625
655
        from bzrlib.tests.http_server import HttpServer
626
 
        from bzrlib.transport import get_transport
627
 
        from bzrlib.transport.local import LocalURLServer
628
656
        from bzrlib.transport.http import HttpTransportBase
629
 
        self.transport_server = LocalURLServer
 
657
        self.transport_server = test_server.LocalURLServer
630
658
        self.transport_readonly_server = HttpServer
631
659
        # calling get_readonly_transport() gives us a HTTP server instance.
632
660
        url = self.get_readonly_url()
633
661
        url2 = self.get_readonly_url('foo/bar')
634
662
        # the transport returned may be any HttpTransportBase subclass
635
 
        t = get_transport(url)
636
 
        t2 = get_transport(url2)
 
663
        t = transport.get_transport(url)
 
664
        t2 = transport.get_transport(url2)
637
665
        self.failUnless(isinstance(t, HttpTransportBase))
638
666
        self.failUnless(isinstance(t2, HttpTransportBase))
639
667
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
663
691
 
664
692
    def setUp(self):
665
693
        super(TestTestCaseTransports, self).setUp()
666
 
        self.vfs_transport_factory = MemoryServer
 
694
        self.vfs_transport_factory = memory.MemoryServer
667
695
 
668
696
    def test_make_bzrdir_preserves_transport(self):
669
697
        t = self.get_transport()
670
698
        result_bzrdir = self.make_bzrdir('subdir')
671
699
        self.assertIsInstance(result_bzrdir.transport,
672
 
                              MemoryTransport)
 
700
                              memory.MemoryTransport)
673
701
        # should not be on disk, should only be in memory
674
702
        self.failIfExists('subdir')
675
703
 
677
705
class TestChrootedTest(tests.ChrootedTestCase):
678
706
 
679
707
    def test_root_is_root(self):
680
 
        from bzrlib.transport import get_transport
681
 
        t = get_transport(self.get_readonly_url())
 
708
        t = transport.get_transport(self.get_readonly_url())
682
709
        url = t.base
683
710
        self.assertEqual(url, t.clone('..').base)
684
711
 
687
714
 
688
715
    def test_profiles_tests(self):
689
716
        self.requireFeature(test_lsprof.LSProfFeature)
690
 
        terminal = unittest.TestResult()
 
717
        terminal = testtools.tests.helpers.ExtendedTestResult()
691
718
        result = tests.ProfileResult(terminal)
692
719
        class Sample(tests.TestCase):
693
720
            def a(self):
695
722
            def sample_function(self):
696
723
                pass
697
724
        test = Sample("a")
698
 
        test.attrs_to_keep = test.attrs_to_keep + ('_benchcalls',)
699
725
        test.run(result)
700
 
        self.assertLength(1, test._benchcalls)
 
726
        case = terminal._events[0][1]
 
727
        self.assertLength(1, case._benchcalls)
701
728
        # We must be able to unpack it as the test reporting code wants
702
 
        (_, _, _), stats = test._benchcalls[0]
 
729
        (_, _, _), stats = case._benchcalls[0]
703
730
        self.assertTrue(callable(stats.pprint))
704
731
 
705
732
 
710
737
                descriptions=0,
711
738
                verbosity=1,
712
739
                )
713
 
        test_case.run(result)
714
 
        timed_string = result._testTimeString(test_case)
 
740
        capture = testtools.tests.helpers.ExtendedTestResult()
 
741
        test_case.run(MultiTestResult(result, capture))
 
742
        run_case = capture._events[0][1]
 
743
        timed_string = result._testTimeString(run_case)
715
744
        self.assertContainsRe(timed_string, expected_re)
716
745
 
717
746
    def test_test_reporting(self):
738
767
    def _patch_get_bzr_source_tree(self):
739
768
        # Reading from the actual source tree breaks isolation, but we don't
740
769
        # want to assume that thats *all* that would happen.
741
 
        def _get_bzr_source_tree():
742
 
            return None
743
 
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
744
 
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
745
 
        def restore():
746
 
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
747
 
        self.addCleanup(restore)
 
770
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree', lambda: None)
748
771
 
749
772
    def test_assigned_benchmark_file_stores_date(self):
750
773
        self._patch_get_bzr_source_tree()
794
817
        self.requireFeature(test_lsprof.LSProfFeature)
795
818
        result_stream = StringIO()
796
819
        result = bzrlib.tests.VerboseTestResult(
797
 
            unittest._WritelnDecorator(result_stream),
 
820
            result_stream,
798
821
            descriptions=0,
799
822
            verbosity=2,
800
823
            )
830
853
        """A KnownFailure being raised should trigger several result actions."""
831
854
        class InstrumentedTestResult(tests.ExtendedTestResult):
832
855
            def stopTestRun(self): pass
833
 
            def startTests(self): pass
834
 
            def report_test_start(self, test): pass
835
 
            def report_known_failure(self, test, err):
836
 
                self._call = test, err
 
856
            def report_tests_starting(self): pass
 
857
            def report_known_failure(self, test, err=None, details=None):
 
858
                self._call = test, 'known failure'
837
859
        result = InstrumentedTestResult(None, None, None, None)
838
860
        class Test(tests.TestCase):
839
861
            def test_function(self):
842
864
        test.run(result)
843
865
        # it should invoke 'report_known_failure'.
844
866
        self.assertEqual(2, len(result._call))
845
 
        self.assertEqual(test, result._call[0])
846
 
        self.assertEqual(tests.KnownFailure, result._call[1][0])
847
 
        self.assertIsInstance(result._call[1][1], tests.KnownFailure)
 
867
        self.assertEqual(test.id(), result._call[0].id())
 
868
        self.assertEqual('known failure', result._call[1])
848
869
        # we dont introspec the traceback, if the rest is ok, it would be
849
870
        # exceptional for it not to be.
850
871
        # it should update the known_failure_count on the object.
856
877
        # verbose test output formatting
857
878
        result_stream = StringIO()
858
879
        result = bzrlib.tests.VerboseTestResult(
859
 
            unittest._WritelnDecorator(result_stream),
 
880
            result_stream,
860
881
            descriptions=0,
861
882
            verbosity=2,
862
883
            )
872
893
        output = result_stream.getvalue()[prefix:]
873
894
        lines = output.splitlines()
874
895
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
896
        if sys.version_info > (2, 7):
 
897
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
898
                self.assertNotEqual, lines[1], '    ')
875
899
        self.assertEqual(lines[1], '    foo')
876
900
        self.assertEqual(2, len(lines))
877
901
 
885
909
        """Test the behaviour of invoking addNotSupported."""
886
910
        class InstrumentedTestResult(tests.ExtendedTestResult):
887
911
            def stopTestRun(self): pass
888
 
            def startTests(self): pass
889
 
            def report_test_start(self, test): pass
 
912
            def report_tests_starting(self): pass
890
913
            def report_unsupported(self, test, feature):
891
914
                self._call = test, feature
892
915
        result = InstrumentedTestResult(None, None, None, None)
911
934
        # verbose test output formatting
912
935
        result_stream = StringIO()
913
936
        result = bzrlib.tests.VerboseTestResult(
914
 
            unittest._WritelnDecorator(result_stream),
 
937
            result_stream,
915
938
            descriptions=0,
916
939
            verbosity=2,
917
940
            )
931
954
        """An UnavailableFeature being raised should invoke addNotSupported."""
932
955
        class InstrumentedTestResult(tests.ExtendedTestResult):
933
956
            def stopTestRun(self): pass
934
 
            def startTests(self): pass
935
 
            def report_test_start(self, test): pass
 
957
            def report_tests_starting(self): pass
936
958
            def addNotSupported(self, test, feature):
937
959
                self._call = test, feature
938
960
        result = InstrumentedTestResult(None, None, None, None)
944
966
        test.run(result)
945
967
        # it should invoke 'addNotSupported'.
946
968
        self.assertEqual(2, len(result._call))
947
 
        self.assertEqual(test, result._call[0])
 
969
        self.assertEqual(test.id(), result._call[0].id())
948
970
        self.assertEqual(feature, result._call[1])
949
971
        # and not count as an error
950
972
        self.assertEqual(0, result.error_count)
980
1002
        class InstrumentedTestResult(tests.ExtendedTestResult):
981
1003
            calls = 0
982
1004
            def startTests(self): self.calls += 1
983
 
            def report_test_start(self, test): pass
984
1005
        result = InstrumentedTestResult(None, None, None, None)
985
1006
        def test_function():
986
1007
            pass
988
1009
        test.run(result)
989
1010
        self.assertEquals(1, result.calls)
990
1011
 
 
1012
    def test_startTests_only_once(self):
 
1013
        """With multiple tests startTests should still only be called once"""
 
1014
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1015
            calls = 0
 
1016
            def startTests(self): self.calls += 1
 
1017
        result = InstrumentedTestResult(None, None, None, None)
 
1018
        suite = unittest.TestSuite([
 
1019
            unittest.FunctionTestCase(lambda: None),
 
1020
            unittest.FunctionTestCase(lambda: None)])
 
1021
        suite.run(result)
 
1022
        self.assertEquals(1, result.calls)
 
1023
        self.assertEquals(2, result.count)
 
1024
 
991
1025
 
992
1026
class TestUnicodeFilenameFeature(tests.TestCase):
993
1027
 
1014
1048
        because of our use of global state.
1015
1049
        """
1016
1050
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1017
 
        old_leak = tests.TestCase._first_thread_leaker_id
1018
1051
        try:
1019
1052
            tests.TestCaseInTempDir.TEST_ROOT = None
1020
 
            tests.TestCase._first_thread_leaker_id = None
1021
1053
            return testrunner.run(test)
1022
1054
        finally:
1023
1055
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1024
 
            tests.TestCase._first_thread_leaker_id = old_leak
1025
1056
 
1026
1057
    def test_known_failure_failed_run(self):
1027
1058
        # run a test that generates a known failure which should be printed in
1028
1059
        # the final output when real failures occur.
1029
1060
        class Test(tests.TestCase):
1030
1061
            def known_failure_test(self):
1031
 
                raise tests.KnownFailure('failed')
 
1062
                self.expectFailure('failed', self.assertTrue, False)
1032
1063
        test = unittest.TestSuite()
1033
1064
        test.addTest(Test("known_failure_test"))
1034
1065
        def failing_test():
1035
 
            raise AssertionError('foo')
 
1066
            self.fail('foo')
1036
1067
        test.addTest(unittest.FunctionTestCase(failing_test))
1037
1068
        stream = StringIO()
1038
1069
        runner = tests.TextTestRunner(stream=stream)
1039
1070
        result = self.run_test_runner(runner, test)
1040
1071
        lines = stream.getvalue().splitlines()
1041
1072
        self.assertContainsRe(stream.getvalue(),
1042
 
            '(?sm)^testing.*$'
 
1073
            '(?sm)^bzr selftest.*$'
1043
1074
            '.*'
1044
1075
            '^======================================================================\n'
1045
 
            '^FAIL: unittest.FunctionTestCase \\(failing_test\\)\n'
 
1076
            '^FAIL: failing_test\n'
1046
1077
            '^----------------------------------------------------------------------\n'
1047
1078
            'Traceback \\(most recent call last\\):\n'
1048
1079
            '  .*' # File .*, line .*, in failing_test' - but maybe not from .pyc
1049
 
            '    raise AssertionError\\(\'foo\'\\)\n'
 
1080
            '    self.fail\\(\'foo\'\\)\n'
1050
1081
            '.*'
1051
1082
            '^----------------------------------------------------------------------\n'
1052
1083
            '.*'
1058
1089
        # the final output.
1059
1090
        class Test(tests.TestCase):
1060
1091
            def known_failure_test(self):
1061
 
                raise tests.KnownFailure('failed')
 
1092
                self.expectFailure('failed', self.assertTrue, False)
1062
1093
        test = Test("known_failure_test")
1063
1094
        stream = StringIO()
1064
1095
        runner = tests.TextTestRunner(stream=stream)
1186
1217
        # Reading from the actual source tree breaks isolation, but we don't
1187
1218
        # want to assume that thats *all* that would happen.
1188
1219
        self._get_source_tree_calls = []
1189
 
        def _get_bzr_source_tree():
 
1220
        def new_get():
1190
1221
            self._get_source_tree_calls.append("called")
1191
1222
            return None
1192
 
        orig_get_bzr_source_tree = bzrlib.version._get_bzr_source_tree
1193
 
        bzrlib.version._get_bzr_source_tree = _get_bzr_source_tree
1194
 
        def restore():
1195
 
            bzrlib.version._get_bzr_source_tree = orig_get_bzr_source_tree
1196
 
        self.addCleanup(restore)
 
1223
        self.overrideAttr(bzrlib.version, '_get_bzr_source_tree',  new_get)
1197
1224
 
1198
1225
    def test_bench_history(self):
1199
1226
        # tests that the running the benchmark passes bench_history into
1209
1236
        self.assertContainsRe(output_string, "--date [0-9.]+")
1210
1237
        self.assertLength(1, self._get_source_tree_calls)
1211
1238
 
1212
 
    def assertLogDeleted(self, test):
1213
 
        log = test._get_log()
1214
 
        self.assertEqual("DELETED log file to reduce memory footprint", log)
1215
 
        self.assertEqual('', test._log_contents)
1216
 
        self.assertIs(None, test._log_file_name)
1217
 
 
1218
 
    def test_success_log_deleted(self):
1219
 
        """Successful tests have their log deleted"""
1220
 
 
1221
 
        class LogTester(tests.TestCase):
1222
 
 
1223
 
            def test_success(self):
1224
 
                self.log('this will be removed\n')
1225
 
 
1226
 
        sio = StringIO()
1227
 
        runner = tests.TextTestRunner(stream=sio)
1228
 
        test = LogTester('test_success')
1229
 
        result = self.run_test_runner(runner, test)
1230
 
 
1231
 
        self.assertLogDeleted(test)
1232
 
 
1233
 
    def test_skipped_log_deleted(self):
1234
 
        """Skipped tests have their log deleted"""
1235
 
 
1236
 
        class LogTester(tests.TestCase):
1237
 
 
1238
 
            def test_skipped(self):
1239
 
                self.log('this will be removed\n')
1240
 
                raise tests.TestSkipped()
1241
 
 
1242
 
        sio = StringIO()
1243
 
        runner = tests.TextTestRunner(stream=sio)
1244
 
        test = LogTester('test_skipped')
1245
 
        result = self.run_test_runner(runner, test)
1246
 
 
1247
 
        self.assertLogDeleted(test)
1248
 
 
1249
 
    def test_not_aplicable_log_deleted(self):
1250
 
        """Not applicable tests have their log deleted"""
1251
 
 
1252
 
        class LogTester(tests.TestCase):
1253
 
 
1254
 
            def test_not_applicable(self):
1255
 
                self.log('this will be removed\n')
1256
 
                raise tests.TestNotApplicable()
1257
 
 
1258
 
        sio = StringIO()
1259
 
        runner = tests.TextTestRunner(stream=sio)
1260
 
        test = LogTester('test_not_applicable')
1261
 
        result = self.run_test_runner(runner, test)
1262
 
 
1263
 
        self.assertLogDeleted(test)
1264
 
 
1265
 
    def test_known_failure_log_deleted(self):
1266
 
        """Know failure tests have their log deleted"""
1267
 
 
1268
 
        class LogTester(tests.TestCase):
1269
 
 
1270
 
            def test_known_failure(self):
1271
 
                self.log('this will be removed\n')
1272
 
                raise tests.KnownFailure()
1273
 
 
1274
 
        sio = StringIO()
1275
 
        runner = tests.TextTestRunner(stream=sio)
1276
 
        test = LogTester('test_known_failure')
1277
 
        result = self.run_test_runner(runner, test)
1278
 
 
1279
 
        self.assertLogDeleted(test)
1280
 
 
1281
 
    def test_fail_log_kept(self):
1282
 
        """Failed tests have their log kept"""
1283
 
 
1284
 
        class LogTester(tests.TestCase):
1285
 
 
1286
 
            def test_fail(self):
1287
 
                self.log('this will be kept\n')
1288
 
                self.fail('this test fails')
1289
 
 
1290
 
        sio = StringIO()
1291
 
        runner = tests.TextTestRunner(stream=sio)
1292
 
        test = LogTester('test_fail')
1293
 
        result = self.run_test_runner(runner, test)
1294
 
 
1295
 
        text = sio.getvalue()
1296
 
        self.assertContainsRe(text, 'this will be kept')
1297
 
        self.assertContainsRe(text, 'this test fails')
1298
 
 
1299
 
        log = test._get_log()
1300
 
        self.assertContainsRe(log, 'this will be kept')
1301
 
        self.assertEqual(log, test._log_contents)
1302
 
 
1303
 
    def test_error_log_kept(self):
1304
 
        """Tests with errors have their log kept"""
1305
 
 
1306
 
        class LogTester(tests.TestCase):
1307
 
 
1308
 
            def test_error(self):
1309
 
                self.log('this will be kept\n')
1310
 
                raise ValueError('random exception raised')
1311
 
 
1312
 
        sio = StringIO()
1313
 
        runner = tests.TextTestRunner(stream=sio)
1314
 
        test = LogTester('test_error')
1315
 
        result = self.run_test_runner(runner, test)
1316
 
 
1317
 
        text = sio.getvalue()
1318
 
        self.assertContainsRe(text, 'this will be kept')
1319
 
        self.assertContainsRe(text, 'random exception raised')
1320
 
 
1321
 
        log = test._get_log()
1322
 
        self.assertContainsRe(log, 'this will be kept')
1323
 
        self.assertEqual(log, test._log_contents)
 
1239
    def test_verbose_test_count(self):
 
1240
        """A verbose test run reports the right test count at the start"""
 
1241
        suite = TestUtil.TestSuite([
 
1242
            unittest.FunctionTestCase(lambda:None),
 
1243
            unittest.FunctionTestCase(lambda:None)])
 
1244
        self.assertEqual(suite.countTestCases(), 2)
 
1245
        stream = StringIO()
 
1246
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1247
        # Need to use the CountingDecorator as that's what sets num_tests
 
1248
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1249
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1324
1250
 
1325
1251
    def test_startTestRun(self):
1326
1252
        """run should call result.startTestRun()"""
1420
1346
        self.assertEqual(flags, bzrlib.debug.debug_flags)
1421
1347
 
1422
1348
    def change_selftest_debug_flags(self, new_flags):
1423
 
        orig_selftest_flags = tests.selftest_debug_flags
1424
 
        self.addCleanup(self._restore_selftest_debug_flags, orig_selftest_flags)
1425
 
        tests.selftest_debug_flags = set(new_flags)
1426
 
 
1427
 
    def _restore_selftest_debug_flags(self, flags):
1428
 
        tests.selftest_debug_flags = flags
 
1349
        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1429
1350
 
1430
1351
    def test_allow_debug_flag(self):
1431
1352
        """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being
1491
1412
        self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags)
1492
1413
 
1493
1414
    def make_test_result(self):
 
1415
        """Get a test result that writes to the test log file."""
1494
1416
        return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1)
1495
1417
 
1496
1418
    def inner_test(self):
1504
1426
        result = self.make_test_result()
1505
1427
        self.inner_test.run(result)
1506
1428
        note("outer finish")
 
1429
        self.addCleanup(osutils.delete_any, self._log_file_name)
1507
1430
 
1508
1431
    def test_trace_nesting(self):
1509
1432
        # this tests that each test case nests its trace facility correctly.
1521
1444
        outer_test = TestTestCase("outer_child")
1522
1445
        result = self.make_test_result()
1523
1446
        outer_test.run(result)
1524
 
        self.addCleanup(osutils.delete_any, outer_test._log_file_name)
1525
1447
        self.assertEqual(original_trace, bzrlib.trace._trace_file)
1526
1448
 
1527
1449
    def method_that_times_a_bit_twice(self):
1534
1456
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1535
1457
        output_stream = StringIO()
1536
1458
        result = bzrlib.tests.VerboseTestResult(
1537
 
            unittest._WritelnDecorator(output_stream),
 
1459
            output_stream,
1538
1460
            descriptions=0,
1539
1461
            verbosity=2)
1540
1462
        sample_test.run(result)
1581
1503
        # permitted.
1582
1504
        # Manually set one up (TestCase doesn't and shouldn't provide magic
1583
1505
        # machinery)
1584
 
        transport_server = MemoryServer()
1585
 
        transport_server.setUp()
1586
 
        self.addCleanup(transport_server.tearDown)
 
1506
        transport_server = memory.MemoryServer()
 
1507
        transport_server.start_server()
 
1508
        self.addCleanup(transport_server.stop_server)
1587
1509
        t = transport.get_transport(transport_server.get_url())
1588
1510
        bzrdir.BzrDir.create(t.base)
1589
1511
        self.assertRaises(errors.BzrError,
1642
1564
        """Test disabled tests behaviour with support aware results."""
1643
1565
        test = SampleTestCase('_test_pass')
1644
1566
        class DisabledFeature(object):
 
1567
            def __eq__(self, other):
 
1568
                return isinstance(other, DisabledFeature)
1645
1569
            def available(self):
1646
1570
                return False
1647
1571
        the_feature = DisabledFeature()
1658
1582
                self.calls.append(('addNotSupported', test, feature))
1659
1583
        result = InstrumentedTestResult()
1660
1584
        test.run(result)
 
1585
        case = result.calls[0][1]
1661
1586
        self.assertEqual([
1662
 
            ('startTest', test),
1663
 
            ('addNotSupported', test, the_feature),
1664
 
            ('stopTest', test),
 
1587
            ('startTest', case),
 
1588
            ('addNotSupported', case, the_feature),
 
1589
            ('stopTest', case),
1665
1590
            ],
1666
1591
            result.calls)
1667
1592
 
1668
1593
    def test_start_server_registers_url(self):
1669
 
        transport_server = MemoryServer()
 
1594
        transport_server = memory.MemoryServer()
1670
1595
        # A little strict, but unlikely to be changed soon.
1671
1596
        self.assertEqual([], self._bzr_selftest_roots)
1672
1597
        self.start_server(transport_server)
1728
1653
        self.assertRaises(AssertionError,
1729
1654
            self.assertListRaises, _TestException, success_generator)
1730
1655
 
 
1656
    def test_overrideAttr_without_value(self):
 
1657
        self.test_attr = 'original' # Define a test attribute
 
1658
        obj = self # Make 'obj' visible to the embedded test
 
1659
        class Test(tests.TestCase):
 
1660
 
 
1661
            def setUp(self):
 
1662
                tests.TestCase.setUp(self)
 
1663
                self.orig = self.overrideAttr(obj, 'test_attr')
 
1664
 
 
1665
            def test_value(self):
 
1666
                self.assertEqual('original', self.orig)
 
1667
                self.assertEqual('original', obj.test_attr)
 
1668
                obj.test_attr = 'modified'
 
1669
                self.assertEqual('modified', obj.test_attr)
 
1670
 
 
1671
        test = Test('test_value')
 
1672
        test.run(unittest.TestResult())
 
1673
        self.assertEqual('original', obj.test_attr)
 
1674
 
 
1675
    def test_overrideAttr_with_value(self):
 
1676
        self.test_attr = 'original' # Define a test attribute
 
1677
        obj = self # Make 'obj' visible to the embedded test
 
1678
        class Test(tests.TestCase):
 
1679
 
 
1680
            def setUp(self):
 
1681
                tests.TestCase.setUp(self)
 
1682
                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
 
1683
 
 
1684
            def test_value(self):
 
1685
                self.assertEqual('original', self.orig)
 
1686
                self.assertEqual('modified', obj.test_attr)
 
1687
 
 
1688
        test = Test('test_value')
 
1689
        test.run(unittest.TestResult())
 
1690
        self.assertEqual('original', obj.test_attr)
 
1691
 
 
1692
 
 
1693
class _MissingFeature(tests.Feature):
 
1694
    def _probe(self):
 
1695
        return False
 
1696
missing_feature = _MissingFeature()
 
1697
 
 
1698
 
 
1699
def _get_test(name):
 
1700
    """Get an instance of a specific example test.
 
1701
 
 
1702
    We protect this in a function so that they don't auto-run in the test
 
1703
    suite.
 
1704
    """
 
1705
 
 
1706
    class ExampleTests(tests.TestCase):
 
1707
 
 
1708
        def test_fail(self):
 
1709
            mutter('this was a failing test')
 
1710
            self.fail('this test will fail')
 
1711
 
 
1712
        def test_error(self):
 
1713
            mutter('this test errored')
 
1714
            raise RuntimeError('gotcha')
 
1715
 
 
1716
        def test_missing_feature(self):
 
1717
            mutter('missing the feature')
 
1718
            self.requireFeature(missing_feature)
 
1719
 
 
1720
        def test_skip(self):
 
1721
            mutter('this test will be skipped')
 
1722
            raise tests.TestSkipped('reason')
 
1723
 
 
1724
        def test_success(self):
 
1725
            mutter('this test succeeds')
 
1726
 
 
1727
        def test_xfail(self):
 
1728
            mutter('test with expected failure')
 
1729
            self.knownFailure('this_fails')
 
1730
 
 
1731
        def test_unexpected_success(self):
 
1732
            mutter('test with unexpected success')
 
1733
            self.expectFailure('should_fail', lambda: None)
 
1734
 
 
1735
    return ExampleTests(name)
 
1736
 
 
1737
 
 
1738
class TestTestCaseLogDetails(tests.TestCase):
 
1739
 
 
1740
    def _run_test(self, test_name):
 
1741
        test = _get_test(test_name)
 
1742
        result = testtools.TestResult()
 
1743
        test.run(result)
 
1744
        return result
 
1745
 
 
1746
    def test_fail_has_log(self):
 
1747
        result = self._run_test('test_fail')
 
1748
        self.assertEqual(1, len(result.failures))
 
1749
        result_content = result.failures[0][1]
 
1750
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1751
        self.assertContainsRe(result_content, 'this was a failing test')
 
1752
 
 
1753
    def test_error_has_log(self):
 
1754
        result = self._run_test('test_error')
 
1755
        self.assertEqual(1, len(result.errors))
 
1756
        result_content = result.errors[0][1]
 
1757
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1758
        self.assertContainsRe(result_content, 'this test errored')
 
1759
 
 
1760
    def test_skip_has_no_log(self):
 
1761
        result = self._run_test('test_skip')
 
1762
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1763
        skips = result.skip_reasons['reason']
 
1764
        self.assertEqual(1, len(skips))
 
1765
        test = skips[0]
 
1766
        self.assertFalse('log' in test.getDetails())
 
1767
 
 
1768
    def test_missing_feature_has_no_log(self):
 
1769
        # testtools doesn't know about addNotSupported, so it just gets
 
1770
        # considered as a skip
 
1771
        result = self._run_test('test_missing_feature')
 
1772
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1773
        skips = result.skip_reasons[missing_feature]
 
1774
        self.assertEqual(1, len(skips))
 
1775
        test = skips[0]
 
1776
        self.assertFalse('log' in test.getDetails())
 
1777
 
 
1778
    def test_xfail_has_no_log(self):
 
1779
        result = self._run_test('test_xfail')
 
1780
        self.assertEqual(1, len(result.expectedFailures))
 
1781
        result_content = result.expectedFailures[0][1]
 
1782
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1783
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1784
 
 
1785
    def test_unexpected_success_has_log(self):
 
1786
        result = self._run_test('test_unexpected_success')
 
1787
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1788
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1789
        # expectedFailures is a list of reasons?
 
1790
        test = result.unexpectedSuccesses[0]
 
1791
        details = test.getDetails()
 
1792
        self.assertTrue('log' in details)
 
1793
 
 
1794
 
 
1795
class TestTestCloning(tests.TestCase):
 
1796
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1797
 
 
1798
    def test_cloned_testcase_does_not_share_details(self):
 
1799
        """A TestCase cloned with clone_test does not share mutable attributes
 
1800
        such as details or cleanups.
 
1801
        """
 
1802
        class Test(tests.TestCase):
 
1803
            def test_foo(self):
 
1804
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1805
        orig_test = Test('test_foo')
 
1806
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1807
        orig_test.run(unittest.TestResult())
 
1808
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1809
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1810
 
 
1811
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1812
        """Applying two levels of scenarios to a test preserves the attributes
 
1813
        added by both scenarios.
 
1814
        """
 
1815
        class Test(tests.TestCase):
 
1816
            def test_foo(self):
 
1817
                pass
 
1818
        test = Test('test_foo')
 
1819
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1820
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1821
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1822
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1823
        all_tests = list(tests.iter_suite_tests(suite))
 
1824
        self.assertLength(4, all_tests)
 
1825
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1826
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1827
 
1731
1828
 
1732
1829
# NB: Don't delete this; it's not actually from 0.11!
1733
1830
@deprecated_function(deprecated_in((0, 11, 0)))
1877
1974
        # make_branch_and_tree has to use local branch and repositories
1878
1975
        # when the vfs transport and local disk are colocated, even if
1879
1976
        # a different transport is in use for url generation.
1880
 
        from bzrlib.transport.fakevfat import FakeVFATServer
1881
 
        self.transport_server = FakeVFATServer
 
1977
        self.transport_server = test_server.FakeVFATServer
1882
1978
        self.assertFalse(self.get_url('t1').startswith('file://'))
1883
1979
        tree = self.make_branch_and_tree('t1')
1884
1980
        base = tree.bzrdir.root_transport.base
1889
1985
                tree.branch.repository.bzrdir.root_transport)
1890
1986
 
1891
1987
 
1892
 
class SelfTestHelper:
 
1988
class SelfTestHelper(object):
1893
1989
 
1894
1990
    def run_selftest(self, **kwargs):
1895
1991
        """Run selftest returning its output."""
1982
2078
        self.assertEqual(expected.getvalue(), repeated.getvalue())
1983
2079
 
1984
2080
    def test_runner_class(self):
1985
 
        self.requireFeature(SubUnitFeature)
 
2081
        self.requireFeature(features.subunit)
1986
2082
        from subunit import ProtocolTestCase
1987
2083
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1988
2084
            test_suite_factory=self.factory)
2020
2116
        self.assertEqual(transport_server, captured_transport[0])
2021
2117
 
2022
2118
    def test_transport_sftp(self):
2023
 
        try:
2024
 
            import bzrlib.transport.sftp
2025
 
        except errors.ParamikoNotPresent:
2026
 
            raise tests.TestSkipped("Paramiko not present")
2027
 
        self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
 
2119
        self.requireFeature(features.paramiko)
 
2120
        from bzrlib.tests import stub_sftp
 
2121
        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2028
2122
 
2029
2123
    def test_transport_memory(self):
2030
 
        self.check_transport_set(bzrlib.transport.memory.MemoryServer)
 
2124
        self.check_transport_set(memory.MemoryServer)
2031
2125
 
2032
2126
 
2033
2127
class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2048
2142
            load_list='missing file name', list_only=True)
2049
2143
 
2050
2144
 
 
2145
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2146
 
 
2147
    _test_needs_features = [features.subunit]
 
2148
 
 
2149
    def run_subunit_stream(self, test_name):
 
2150
        from subunit import ProtocolTestCase
 
2151
        def factory():
 
2152
            return TestUtil.TestSuite([_get_test(test_name)])
 
2153
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2154
            test_suite_factory=factory)
 
2155
        test = ProtocolTestCase(stream)
 
2156
        result = testtools.TestResult()
 
2157
        test.run(result)
 
2158
        content = stream.getvalue()
 
2159
        return content, result
 
2160
 
 
2161
    def test_fail_has_log(self):
 
2162
        content, result = self.run_subunit_stream('test_fail')
 
2163
        self.assertEqual(1, len(result.failures))
 
2164
        self.assertContainsRe(content, '(?m)^log$')
 
2165
        self.assertContainsRe(content, 'this test will fail')
 
2166
 
 
2167
    def test_error_has_log(self):
 
2168
        content, result = self.run_subunit_stream('test_error')
 
2169
        self.assertContainsRe(content, '(?m)^log$')
 
2170
        self.assertContainsRe(content, 'this test errored')
 
2171
 
 
2172
    def test_skip_has_no_log(self):
 
2173
        content, result = self.run_subunit_stream('test_skip')
 
2174
        self.assertNotContainsRe(content, '(?m)^log$')
 
2175
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2176
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2177
        skips = result.skip_reasons['reason']
 
2178
        self.assertEqual(1, len(skips))
 
2179
        test = skips[0]
 
2180
        # RemotedTestCase doesn't preserve the "details"
 
2181
        ## self.assertFalse('log' in test.getDetails())
 
2182
 
 
2183
    def test_missing_feature_has_no_log(self):
 
2184
        content, result = self.run_subunit_stream('test_missing_feature')
 
2185
        self.assertNotContainsRe(content, '(?m)^log$')
 
2186
        self.assertNotContainsRe(content, 'missing the feature')
 
2187
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2188
        skips = result.skip_reasons['_MissingFeature\n']
 
2189
        self.assertEqual(1, len(skips))
 
2190
        test = skips[0]
 
2191
        # RemotedTestCase doesn't preserve the "details"
 
2192
        ## self.assertFalse('log' in test.getDetails())
 
2193
 
 
2194
    def test_xfail_has_no_log(self):
 
2195
        content, result = self.run_subunit_stream('test_xfail')
 
2196
        self.assertNotContainsRe(content, '(?m)^log$')
 
2197
        self.assertNotContainsRe(content, 'test with expected failure')
 
2198
        self.assertEqual(1, len(result.expectedFailures))
 
2199
        result_content = result.expectedFailures[0][1]
 
2200
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2201
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2202
 
 
2203
    def test_unexpected_success_has_log(self):
 
2204
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2205
        self.assertContainsRe(content, '(?m)^log$')
 
2206
        self.assertContainsRe(content, 'test with unexpected success')
 
2207
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2208
                           ' as a plain success',
 
2209
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2210
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2211
        test = result.unexpectedSuccesses[0]
 
2212
        # RemotedTestCase doesn't preserve the "details"
 
2213
        ## self.assertTrue('log' in test.getDetails())
 
2214
 
 
2215
    def test_success_has_no_log(self):
 
2216
        content, result = self.run_subunit_stream('test_success')
 
2217
        self.assertEqual(1, result.testsRun)
 
2218
        self.assertNotContainsRe(content, '(?m)^log$')
 
2219
        self.assertNotContainsRe(content, 'this test succeeds')
 
2220
 
 
2221
 
2051
2222
class TestRunBzr(tests.TestCase):
2052
2223
 
2053
2224
    out = ''
2416
2587
            os.chdir = orig_chdir
2417
2588
        self.assertEqual(['foo', 'current'], chdirs)
2418
2589
 
 
2590
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2591
        self.get_source_path = lambda: ""
 
2592
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2593
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2594
 
2419
2595
 
2420
2596
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2421
2597
    """Tests that really need to do things with an external bzr."""
2434
2610
        self.assertEqual('bzr: interrupted\n', result[1])
2435
2611
 
2436
2612
 
2437
 
class TestKnownFailure(tests.TestCase):
2438
 
 
2439
 
    def test_known_failure(self):
2440
 
        """Check that KnownFailure is defined appropriately."""
2441
 
        # a KnownFailure is an assertion error for compatability with unaware
2442
 
        # runners.
2443
 
        self.assertIsInstance(tests.KnownFailure(""), AssertionError)
2444
 
 
2445
 
    def test_expect_failure(self):
2446
 
        try:
2447
 
            self.expectFailure("Doomed to failure", self.assertTrue, False)
2448
 
        except tests.KnownFailure, e:
2449
 
            self.assertEqual('Doomed to failure', e.args[0])
2450
 
        try:
2451
 
            self.expectFailure("Doomed to failure", self.assertTrue, True)
2452
 
        except AssertionError, e:
2453
 
            self.assertEqual('Unexpected success.  Should have failed:'
2454
 
                             ' Doomed to failure', e.args[0])
2455
 
        else:
2456
 
            self.fail('Assertion not raised')
2457
 
 
2458
 
 
2459
2613
class TestFeature(tests.TestCase):
2460
2614
 
2461
2615
    def test_caching(self):
2497
2651
        self.assertIs(feature, exception.args[0])
2498
2652
 
2499
2653
 
 
2654
simple_thunk_feature = tests._CompatabilityThunkFeature(
 
2655
    deprecated_in((2, 1, 0)),
 
2656
    'bzrlib.tests.test_selftest',
 
2657
    'simple_thunk_feature','UnicodeFilename',
 
2658
    replacement_module='bzrlib.tests'
 
2659
    )
 
2660
 
 
2661
class Test_CompatibilityFeature(tests.TestCase):
 
2662
 
 
2663
    def test_does_thunk(self):
 
2664
        res = self.callDeprecated(
 
2665
            ['bzrlib.tests.test_selftest.simple_thunk_feature was deprecated'
 
2666
             ' in version 2.1.0. Use bzrlib.tests.UnicodeFilename instead.'],
 
2667
            simple_thunk_feature.available)
 
2668
        self.assertEqual(tests.UnicodeFilename.available(), res)
 
2669
 
 
2670
 
2500
2671
class TestModuleAvailableFeature(tests.TestCase):
2501
2672
 
2502
2673
    def test_available_module(self):
2697
2868
        # the test framework
2698
2869
        self.assertEquals('always fails', str(e))
2699
2870
        # check that there's no traceback in the test log
2700
 
        self.assertNotContainsRe(self._get_log(keep_log_file=True),
2701
 
            r'Traceback')
 
2871
        self.assertNotContainsRe(self.get_log(), r'Traceback')
2702
2872
 
2703
2873
    def test_run_bzr_user_error_caught(self):
2704
2874
        # Running bzr in blackbox mode, normal/expected/user errors should be
2705
2875
        # caught in the regular way and turned into an error message plus exit
2706
2876
        # code.
2707
 
        transport_server = MemoryServer()
2708
 
        transport_server.setUp()
2709
 
        self.addCleanup(transport_server.tearDown)
 
2877
        transport_server = memory.MemoryServer()
 
2878
        transport_server.start_server()
 
2879
        self.addCleanup(transport_server.stop_server)
2710
2880
        url = transport_server.get_url()
2711
2881
        self.permit_url(url)
2712
2882
        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2856
3026
        # Test that a plausible list of modules to doctest is returned
2857
3027
        # by _test_suite_modules_to_doctest.
2858
3028
        test_list = tests._test_suite_modules_to_doctest()
 
3029
        if __doc__ is None:
 
3030
            # When docstrings are stripped, there are no modules to doctest
 
3031
            self.assertEqual([], test_list)
 
3032
            return
2859
3033
        self.assertSubset([
2860
3034
            'bzrlib.timestamp',
2861
3035
            ],
2868
3042
        # test doubles that supply a few sample tests to load, and check they
2869
3043
        # are loaded.
2870
3044
        calls = []
2871
 
        def _test_suite_testmod_names():
 
3045
        def testmod_names():
2872
3046
            calls.append("testmod_names")
2873
3047
            return [
2874
3048
                'bzrlib.tests.blackbox.test_branch',
2875
3049
                'bzrlib.tests.per_transport',
2876
3050
                'bzrlib.tests.test_selftest',
2877
3051
                ]
2878
 
        original_testmod_names = tests._test_suite_testmod_names
2879
 
        def _test_suite_modules_to_doctest():
 
3052
        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
 
3053
        def doctests():
2880
3054
            calls.append("modules_to_doctest")
 
3055
            if __doc__ is None:
 
3056
                return []
2881
3057
            return ['bzrlib.timestamp']
2882
 
        orig_modules_to_doctest = tests._test_suite_modules_to_doctest
2883
 
        def restore_names():
2884
 
            tests._test_suite_testmod_names = original_testmod_names
2885
 
            tests._test_suite_modules_to_doctest = orig_modules_to_doctest
2886
 
        self.addCleanup(restore_names)
2887
 
        tests._test_suite_testmod_names = _test_suite_testmod_names
2888
 
        tests._test_suite_modules_to_doctest = _test_suite_modules_to_doctest
 
3058
        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
2889
3059
        expected_test_list = [
2890
3060
            # testmod_names
2891
3061
            'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
2892
3062
            ('bzrlib.tests.per_transport.TransportTests'
2893
3063
             '.test_abspath(LocalTransport,LocalURLServer)'),
2894
3064
            'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite',
2895
 
            # modules_to_doctest
2896
 
            'bzrlib.timestamp.format_highres_date',
2897
3065
            # plugins can't be tested that way since selftest may be run with
2898
3066
            # --no-plugins
2899
3067
            ]
 
3068
        if __doc__ is not None:
 
3069
            expected_test_list.extend([
 
3070
                # modules_to_doctest
 
3071
                'bzrlib.timestamp.format_highres_date',
 
3072
                ])
2900
3073
        suite = tests.test_suite()
2901
3074
        self.assertEqual(set(["testmod_names", "modules_to_doctest"]),
2902
3075
            set(calls))
2957
3130
    def test_load_tests(self):
2958
3131
        test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing']
2959
3132
        loader = self._create_loader(test_list)
2960
 
 
2961
3133
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2962
3134
        self.assertEquals(test_list, _test_ids(suite))
2963
3135
 
2964
3136
    def test_exclude_tests(self):
2965
3137
        test_list = ['bogus']
2966
3138
        loader = self._create_loader(test_list)
2967
 
 
2968
3139
        suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler')
2969
3140
        self.assertEquals([], _test_ids(suite))
2970
3141
 
3015
3186
        tpr.register('bar', 'bbb.aaa.rrr')
3016
3187
        tpr.register('bar', 'bBB.aAA.rRR')
3017
3188
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
3018
 
        self.assertContainsRe(self._get_log(keep_log_file=True),
3019
 
                              r'.*bar.*bbb.aaa.rrr.*bBB.aAA.rRR')
 
3189
        self.assertThat(self.get_log(),
 
3190
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
3020
3191
 
3021
3192
    def test_get_unknown_prefix(self):
3022
3193
        tpr = self._get_registry()
3042
3213
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
3043
3214
 
3044
3215
 
 
3216
class TestThreadLeakDetection(tests.TestCase):
 
3217
    """Ensure when tests leak threads we detect and report it"""
 
3218
 
 
3219
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3220
        def __init__(self):
 
3221
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3222
            self.leaks = []
 
3223
        def _report_thread_leak(self, test, leaks, alive):
 
3224
            self.leaks.append((test, leaks))
 
3225
 
 
3226
    def test_testcase_without_addCleanups(self):
 
3227
        """Check old TestCase instances don't break with leak detection"""
 
3228
        class Test(unittest.TestCase):
 
3229
            def runTest(self):
 
3230
                pass
 
3231
            addCleanup = None # for when on Python 2.7 with native addCleanup
 
3232
        result = self.LeakRecordingResult()
 
3233
        test = Test()
 
3234
        self.assertIs(getattr(test, "addCleanup", None), None)
 
3235
        result.startTestRun()
 
3236
        test.run(result)
 
3237
        result.stopTestRun()
 
3238
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3239
        self.assertEqual(result.leaks, [])
 
3240
        
 
3241
    def test_thread_leak(self):
 
3242
        """Ensure a thread that outlives the running of a test is reported
 
3243
 
 
3244
        Uses a thread that blocks on an event, and is started by the inner
 
3245
        test case. As the thread outlives the inner case's run, it should be
 
3246
        detected as a leak, but the event is then set so that the thread can
 
3247
        be safely joined in cleanup so it's not leaked for real.
 
3248
        """
 
3249
        event = threading.Event()
 
3250
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3251
        class Test(tests.TestCase):
 
3252
            def test_leak(self):
 
3253
                thread.start()
 
3254
        result = self.LeakRecordingResult()
 
3255
        test = Test("test_leak")
 
3256
        self.addCleanup(thread.join)
 
3257
        self.addCleanup(event.set)
 
3258
        result.startTestRun()
 
3259
        test.run(result)
 
3260
        result.stopTestRun()
 
3261
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3262
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3263
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3264
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3265
 
 
3266
    def test_multiple_leaks(self):
 
3267
        """Check multiple leaks are blamed on the test cases at fault
 
3268
 
 
3269
        Same concept as the previous test, but has one inner test method that
 
3270
        leaks two threads, and one that doesn't leak at all.
 
3271
        """
 
3272
        event = threading.Event()
 
3273
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3274
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3275
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3276
        class Test(tests.TestCase):
 
3277
            def test_first_leak(self):
 
3278
                thread_b.start()
 
3279
            def test_second_no_leak(self):
 
3280
                pass
 
3281
            def test_third_leak(self):
 
3282
                thread_c.start()
 
3283
                thread_a.start()
 
3284
        result = self.LeakRecordingResult()
 
3285
        first_test = Test("test_first_leak")
 
3286
        third_test = Test("test_third_leak")
 
3287
        self.addCleanup(thread_a.join)
 
3288
        self.addCleanup(thread_b.join)
 
3289
        self.addCleanup(thread_c.join)
 
3290
        self.addCleanup(event.set)
 
3291
        result.startTestRun()
 
3292
        unittest.TestSuite(
 
3293
            [first_test, Test("test_second_no_leak"), third_test]
 
3294
            ).run(result)
 
3295
        result.stopTestRun()
 
3296
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3297
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3298
        self.assertEqual(result.leaks, [
 
3299
            (first_test, set([thread_b])),
 
3300
            (third_test, set([thread_a, thread_c]))])
 
3301
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3302
 
 
3303
 
3045
3304
class TestRunSuite(tests.TestCase):
3046
3305
 
3047
3306
    def test_runner_class(self):