/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_selftest.py

  • Committer: Jelmer Vernooij
  • Date: 2011-02-08 15:41:44 UTC
  • mfrom: (5651.4.1 repo-format-registry)
  • mto: This revision was merged to the branch mainline in revision 5658.
  • Revision ID: jelmer@samba.org-20110208154144-h8mm2hrk87de4w2s
Merge repo format registry branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the test framework."""
18
18
 
19
19
from cStringIO import StringIO
20
 
from doctest import ELLIPSIS
 
20
import doctest
21
21
import os
22
22
import signal
23
23
import sys
 
24
import threading
24
25
import time
25
26
import unittest
26
27
import warnings
27
28
 
28
 
from testtools import MultiTestResult
 
29
from testtools import (
 
30
    ExtendedToOriginalDecorator,
 
31
    MultiTestResult,
 
32
    )
 
33
from testtools.content import Content
29
34
from testtools.content_type import ContentType
30
35
from testtools.matchers import (
31
36
    DocTestMatches,
37
42
from bzrlib import (
38
43
    branchbuilder,
39
44
    bzrdir,
40
 
    debug,
41
45
    errors,
42
46
    lockdir,
43
47
    memorytree,
44
48
    osutils,
45
 
    progress,
46
49
    remote,
47
50
    repository,
48
51
    symbol_versioning,
52
55
    )
53
56
from bzrlib.repofmt import (
54
57
    groupcompress_repo,
55
 
    pack_repo,
56
58
    weaverepo,
57
59
    )
58
60
from bzrlib.symbol_versioning import (
64
66
    features,
65
67
    test_lsprof,
66
68
    test_server,
67
 
    test_sftp_transport,
68
69
    TestUtil,
69
70
    )
70
 
from bzrlib.trace import note
 
71
from bzrlib.trace import note, mutter
71
72
from bzrlib.transport import memory
72
 
from bzrlib.version import _get_bzr_source_tree
73
73
 
74
74
 
75
75
def _test_ids(test_suite):
77
77
    return [t.id() for t in tests.iter_suite_tests(test_suite)]
78
78
 
79
79
 
80
 
class SelftestTests(tests.TestCase):
81
 
 
82
 
    def test_import_tests(self):
83
 
        mod = TestUtil._load_module_by_name('bzrlib.tests.test_selftest')
84
 
        self.assertEqual(mod.SelftestTests, SelftestTests)
85
 
 
86
 
    def test_import_test_failure(self):
87
 
        self.assertRaises(ImportError,
88
 
                          TestUtil._load_module_by_name,
89
 
                          'bzrlib.no-name-yet')
90
 
 
91
 
 
92
80
class MetaTestLog(tests.TestCase):
93
81
 
94
82
    def test_logging(self):
100
88
            "text", "plain", {"charset": "utf8"})))
101
89
        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
102
90
        self.assertThat(self.get_log(),
103
 
            DocTestMatches(u"...a test message\n", ELLIPSIS))
 
91
            DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
104
92
 
105
93
 
106
94
class TestUnicodeFilename(tests.TestCase):
122
110
        self.failUnlessExists(filename)
123
111
 
124
112
 
 
113
class TestClassesAvailable(tests.TestCase):
 
114
    """As a convenience we expose Test* classes from bzrlib.tests"""
 
115
 
 
116
    def test_test_case(self):
 
117
        from bzrlib.tests import TestCase
 
118
 
 
119
    def test_test_loader(self):
 
120
        from bzrlib.tests import TestLoader
 
121
 
 
122
    def test_test_suite(self):
 
123
        from bzrlib.tests import TestSuite
 
124
 
 
125
 
125
126
class TestTransportScenarios(tests.TestCase):
126
127
    """A group of tests that test the transport implementation adaption core.
127
128
 
208
209
    def test_scenarios(self):
209
210
        # check that constructor parameters are passed through to the adapted
210
211
        # test.
211
 
        from bzrlib.tests.per_bzrdir import make_scenarios
 
212
        from bzrlib.tests.per_controldir import make_scenarios
212
213
        vfs_factory = "v"
213
214
        server1 = "a"
214
215
        server2 = "b"
312
313
        from bzrlib.tests.per_interrepository import make_scenarios
313
314
        server1 = "a"
314
315
        server2 = "b"
315
 
        formats = [("C0", "C1", "C2"), ("D0", "D1", "D2")]
 
316
        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
316
317
        scenarios = make_scenarios(server1, server2, formats)
317
318
        self.assertEqual([
318
319
            ('C0,str,str',
319
320
             {'repository_format': 'C1',
320
321
              'repository_format_to': 'C2',
321
322
              'transport_readonly_server': 'b',
322
 
              'transport_server': 'a'}),
 
323
              'transport_server': 'a',
 
324
              'extra_setup': 'C3'}),
323
325
            ('D0,str,str',
324
326
             {'repository_format': 'D1',
325
327
              'repository_format_to': 'D2',
326
328
              'transport_readonly_server': 'b',
327
 
              'transport_server': 'a'})],
 
329
              'transport_server': 'a',
 
330
              'extra_setup': 'D3'})],
328
331
            scenarios)
329
332
 
330
333
 
609
612
                l.attempt_lock()
610
613
        test = TestDanglingLock('test_function')
611
614
        result = test.run()
 
615
        total_failures = result.errors + result.failures
612
616
        if self._lock_check_thorough:
613
 
            self.assertEqual(1, len(result.errors))
 
617
            self.assertEqual(1, len(total_failures))
614
618
        else:
615
619
            # When _lock_check_thorough is disabled, then we don't trigger a
616
620
            # failure
617
 
            self.assertEqual(0, len(result.errors))
 
621
            self.assertEqual(0, len(total_failures))
618
622
 
619
623
 
620
624
class TestTestCaseWithTransport(tests.TestCaseWithTransport):
621
625
    """Tests for the convenience functions TestCaseWithTransport introduces."""
622
626
 
623
627
    def test_get_readonly_url_none(self):
624
 
        from bzrlib.transport import get_transport
625
628
        from bzrlib.transport.readonly import ReadonlyTransportDecorator
626
629
        self.vfs_transport_factory = memory.MemoryServer
627
630
        self.transport_readonly_server = None
629
632
        # for the server
630
633
        url = self.get_readonly_url()
631
634
        url2 = self.get_readonly_url('foo/bar')
632
 
        t = get_transport(url)
633
 
        t2 = get_transport(url2)
 
635
        t = transport.get_transport(url)
 
636
        t2 = transport.get_transport(url2)
634
637
        self.failUnless(isinstance(t, ReadonlyTransportDecorator))
635
638
        self.failUnless(isinstance(t2, ReadonlyTransportDecorator))
636
639
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
637
640
 
638
641
    def test_get_readonly_url_http(self):
639
642
        from bzrlib.tests.http_server import HttpServer
640
 
        from bzrlib.transport import get_transport
641
643
        from bzrlib.transport.http import HttpTransportBase
642
644
        self.transport_server = test_server.LocalURLServer
643
645
        self.transport_readonly_server = HttpServer
645
647
        url = self.get_readonly_url()
646
648
        url2 = self.get_readonly_url('foo/bar')
647
649
        # the transport returned may be any HttpTransportBase subclass
648
 
        t = get_transport(url)
649
 
        t2 = get_transport(url2)
 
650
        t = transport.get_transport(url)
 
651
        t2 = transport.get_transport(url2)
650
652
        self.failUnless(isinstance(t, HttpTransportBase))
651
653
        self.failUnless(isinstance(t2, HttpTransportBase))
652
654
        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
690
692
class TestChrootedTest(tests.ChrootedTestCase):
691
693
 
692
694
    def test_root_is_root(self):
693
 
        from bzrlib.transport import get_transport
694
 
        t = get_transport(self.get_readonly_url())
 
695
        t = transport.get_transport(self.get_readonly_url())
695
696
        url = t.base
696
697
        self.assertEqual(url, t.clone('..').base)
697
698
 
803
804
        self.requireFeature(test_lsprof.LSProfFeature)
804
805
        result_stream = StringIO()
805
806
        result = bzrlib.tests.VerboseTestResult(
806
 
            unittest._WritelnDecorator(result_stream),
 
807
            result_stream,
807
808
            descriptions=0,
808
809
            verbosity=2,
809
810
            )
835
836
        self.assertContainsRe(output,
836
837
            r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n")
837
838
 
 
839
    def test_uses_time_from_testtools(self):
 
840
        """Test case timings in verbose results should use testtools times"""
 
841
        import datetime
 
842
        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
 
843
            def startTest(self, test):
 
844
                self.time(datetime.datetime.utcfromtimestamp(1.145))
 
845
                super(TimeAddedVerboseTestResult, self).startTest(test)
 
846
            def addSuccess(self, test):
 
847
                self.time(datetime.datetime.utcfromtimestamp(51.147))
 
848
                super(TimeAddedVerboseTestResult, self).addSuccess(test)
 
849
            def report_tests_starting(self): pass
 
850
        sio = StringIO()
 
851
        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
 
852
        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
 
853
 
838
854
    def test_known_failure(self):
839
855
        """A KnownFailure being raised should trigger several result actions."""
840
856
        class InstrumentedTestResult(tests.ExtendedTestResult):
841
857
            def stopTestRun(self): pass
842
 
            def startTests(self): pass
843
 
            def report_test_start(self, test): pass
 
858
            def report_tests_starting(self): pass
844
859
            def report_known_failure(self, test, err=None, details=None):
845
860
                self._call = test, 'known failure'
846
861
        result = InstrumentedTestResult(None, None, None, None)
864
879
        # verbose test output formatting
865
880
        result_stream = StringIO()
866
881
        result = bzrlib.tests.VerboseTestResult(
867
 
            unittest._WritelnDecorator(result_stream),
 
882
            result_stream,
868
883
            descriptions=0,
869
884
            verbosity=2,
870
885
            )
880
895
        output = result_stream.getvalue()[prefix:]
881
896
        lines = output.splitlines()
882
897
        self.assertContainsRe(lines[0], r'XFAIL *\d+ms$')
 
898
        if sys.version_info > (2, 7):
 
899
            self.expectFailure("_ExpectedFailure on 2.7 loses the message",
 
900
                self.assertNotEqual, lines[1], '    ')
883
901
        self.assertEqual(lines[1], '    foo')
884
902
        self.assertEqual(2, len(lines))
885
903
 
893
911
        """Test the behaviour of invoking addNotSupported."""
894
912
        class InstrumentedTestResult(tests.ExtendedTestResult):
895
913
            def stopTestRun(self): pass
896
 
            def startTests(self): pass
897
 
            def report_test_start(self, test): pass
 
914
            def report_tests_starting(self): pass
898
915
            def report_unsupported(self, test, feature):
899
916
                self._call = test, feature
900
917
        result = InstrumentedTestResult(None, None, None, None)
919
936
        # verbose test output formatting
920
937
        result_stream = StringIO()
921
938
        result = bzrlib.tests.VerboseTestResult(
922
 
            unittest._WritelnDecorator(result_stream),
 
939
            result_stream,
923
940
            descriptions=0,
924
941
            verbosity=2,
925
942
            )
939
956
        """An UnavailableFeature being raised should invoke addNotSupported."""
940
957
        class InstrumentedTestResult(tests.ExtendedTestResult):
941
958
            def stopTestRun(self): pass
942
 
            def startTests(self): pass
943
 
            def report_test_start(self, test): pass
 
959
            def report_tests_starting(self): pass
944
960
            def addNotSupported(self, test, feature):
945
961
                self._call = test, feature
946
962
        result = InstrumentedTestResult(None, None, None, None)
988
1004
        class InstrumentedTestResult(tests.ExtendedTestResult):
989
1005
            calls = 0
990
1006
            def startTests(self): self.calls += 1
991
 
            def report_test_start(self, test): pass
992
1007
        result = InstrumentedTestResult(None, None, None, None)
993
1008
        def test_function():
994
1009
            pass
996
1011
        test.run(result)
997
1012
        self.assertEquals(1, result.calls)
998
1013
 
 
1014
    def test_startTests_only_once(self):
 
1015
        """With multiple tests startTests should still only be called once"""
 
1016
        class InstrumentedTestResult(tests.ExtendedTestResult):
 
1017
            calls = 0
 
1018
            def startTests(self): self.calls += 1
 
1019
        result = InstrumentedTestResult(None, None, None, None)
 
1020
        suite = unittest.TestSuite([
 
1021
            unittest.FunctionTestCase(lambda: None),
 
1022
            unittest.FunctionTestCase(lambda: None)])
 
1023
        suite.run(result)
 
1024
        self.assertEquals(1, result.calls)
 
1025
        self.assertEquals(2, result.count)
 
1026
 
999
1027
 
1000
1028
class TestUnicodeFilenameFeature(tests.TestCase):
1001
1029
 
1022
1050
        because of our use of global state.
1023
1051
        """
1024
1052
        old_root = tests.TestCaseInTempDir.TEST_ROOT
1025
 
        old_leak = tests.TestCase._first_thread_leaker_id
1026
1053
        try:
1027
1054
            tests.TestCaseInTempDir.TEST_ROOT = None
1028
 
            tests.TestCase._first_thread_leaker_id = None
1029
1055
            return testrunner.run(test)
1030
1056
        finally:
1031
1057
            tests.TestCaseInTempDir.TEST_ROOT = old_root
1032
 
            tests.TestCase._first_thread_leaker_id = old_leak
1033
1058
 
1034
1059
    def test_known_failure_failed_run(self):
1035
1060
        # run a test that generates a known failure which should be printed in
1081
1106
    def test_result_decorator(self):
1082
1107
        # decorate results
1083
1108
        calls = []
1084
 
        class LoggingDecorator(tests.ForwardingResult):
 
1109
        class LoggingDecorator(ExtendedToOriginalDecorator):
1085
1110
            def startTest(self, test):
1086
 
                tests.ForwardingResult.startTest(self, test)
 
1111
                ExtendedToOriginalDecorator.startTest(self, test)
1087
1112
                calls.append('start')
1088
1113
        test = unittest.FunctionTestCase(lambda:None)
1089
1114
        stream = StringIO()
1213
1238
        self.assertContainsRe(output_string, "--date [0-9.]+")
1214
1239
        self.assertLength(1, self._get_source_tree_calls)
1215
1240
 
 
1241
    def test_verbose_test_count(self):
 
1242
        """A verbose test run reports the right test count at the start"""
 
1243
        suite = TestUtil.TestSuite([
 
1244
            unittest.FunctionTestCase(lambda:None),
 
1245
            unittest.FunctionTestCase(lambda:None)])
 
1246
        self.assertEqual(suite.countTestCases(), 2)
 
1247
        stream = StringIO()
 
1248
        runner = tests.TextTestRunner(stream=stream, verbosity=2)
 
1249
        # Need to use the CountingDecorator as that's what sets num_tests
 
1250
        result = self.run_test_runner(runner, tests.CountingDecorator(suite))
 
1251
        self.assertStartsWith(stream.getvalue(), "running 2 tests")
 
1252
 
1216
1253
    def test_startTestRun(self):
1217
1254
        """run should call result.startTestRun()"""
1218
1255
        calls = []
1219
 
        class LoggingDecorator(tests.ForwardingResult):
 
1256
        class LoggingDecorator(ExtendedToOriginalDecorator):
1220
1257
            def startTestRun(self):
1221
 
                tests.ForwardingResult.startTestRun(self)
 
1258
                ExtendedToOriginalDecorator.startTestRun(self)
1222
1259
                calls.append('startTestRun')
1223
1260
        test = unittest.FunctionTestCase(lambda:None)
1224
1261
        stream = StringIO()
1230
1267
    def test_stopTestRun(self):
1231
1268
        """run should call result.stopTestRun()"""
1232
1269
        calls = []
1233
 
        class LoggingDecorator(tests.ForwardingResult):
 
1270
        class LoggingDecorator(ExtendedToOriginalDecorator):
1234
1271
            def stopTestRun(self):
1235
 
                tests.ForwardingResult.stopTestRun(self)
 
1272
                ExtendedToOriginalDecorator.stopTestRun(self)
1236
1273
                calls.append('stopTestRun')
1237
1274
        test = unittest.FunctionTestCase(lambda:None)
1238
1275
        stream = StringIO()
1241
1278
        result = self.run_test_runner(runner, test)
1242
1279
        self.assertLength(1, calls)
1243
1280
 
 
1281
    def test_unicode_test_output_on_ascii_stream(self):
 
1282
        """Showing results should always succeed even on an ascii console"""
 
1283
        class FailureWithUnicode(tests.TestCase):
 
1284
            def test_log_unicode(self):
 
1285
                self.log(u"\u2606")
 
1286
                self.fail("Now print that log!")
 
1287
        out = StringIO()
 
1288
        self.overrideAttr(osutils, "get_terminal_encoding",
 
1289
            lambda trace=False: "ascii")
 
1290
        result = self.run_test_runner(tests.TextTestRunner(stream=out),
 
1291
            FailureWithUnicode("test_log_unicode"))
 
1292
        self.assertContainsRe(out.getvalue(),
 
1293
            "Text attachment: log\n"
 
1294
            "-+\n"
 
1295
            "\d+\.\d+  \\\\u2606\n"
 
1296
            "-+\n")
 
1297
 
1244
1298
 
1245
1299
class SampleTestCase(tests.TestCase):
1246
1300
 
1421
1475
        sample_test = TestTestCase("method_that_times_a_bit_twice")
1422
1476
        output_stream = StringIO()
1423
1477
        result = bzrlib.tests.VerboseTestResult(
1424
 
            unittest._WritelnDecorator(output_stream),
 
1478
            output_stream,
1425
1479
            descriptions=0,
1426
1480
            verbosity=2)
1427
1481
        sample_test.run(result)
1655
1709
        self.assertEqual('original', obj.test_attr)
1656
1710
 
1657
1711
 
 
1712
class _MissingFeature(tests.Feature):
 
1713
    def _probe(self):
 
1714
        return False
 
1715
missing_feature = _MissingFeature()
 
1716
 
 
1717
 
 
1718
def _get_test(name):
 
1719
    """Get an instance of a specific example test.
 
1720
 
 
1721
    We protect this in a function so that they don't auto-run in the test
 
1722
    suite.
 
1723
    """
 
1724
 
 
1725
    class ExampleTests(tests.TestCase):
 
1726
 
 
1727
        def test_fail(self):
 
1728
            mutter('this was a failing test')
 
1729
            self.fail('this test will fail')
 
1730
 
 
1731
        def test_error(self):
 
1732
            mutter('this test errored')
 
1733
            raise RuntimeError('gotcha')
 
1734
 
 
1735
        def test_missing_feature(self):
 
1736
            mutter('missing the feature')
 
1737
            self.requireFeature(missing_feature)
 
1738
 
 
1739
        def test_skip(self):
 
1740
            mutter('this test will be skipped')
 
1741
            raise tests.TestSkipped('reason')
 
1742
 
 
1743
        def test_success(self):
 
1744
            mutter('this test succeeds')
 
1745
 
 
1746
        def test_xfail(self):
 
1747
            mutter('test with expected failure')
 
1748
            self.knownFailure('this_fails')
 
1749
 
 
1750
        def test_unexpected_success(self):
 
1751
            mutter('test with unexpected success')
 
1752
            self.expectFailure('should_fail', lambda: None)
 
1753
 
 
1754
    return ExampleTests(name)
 
1755
 
 
1756
 
 
1757
class TestTestCaseLogDetails(tests.TestCase):
 
1758
 
 
1759
    def _run_test(self, test_name):
 
1760
        test = _get_test(test_name)
 
1761
        result = testtools.TestResult()
 
1762
        test.run(result)
 
1763
        return result
 
1764
 
 
1765
    def test_fail_has_log(self):
 
1766
        result = self._run_test('test_fail')
 
1767
        self.assertEqual(1, len(result.failures))
 
1768
        result_content = result.failures[0][1]
 
1769
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1770
        self.assertContainsRe(result_content, 'this was a failing test')
 
1771
 
 
1772
    def test_error_has_log(self):
 
1773
        result = self._run_test('test_error')
 
1774
        self.assertEqual(1, len(result.errors))
 
1775
        result_content = result.errors[0][1]
 
1776
        self.assertContainsRe(result_content, 'Text attachment: log')
 
1777
        self.assertContainsRe(result_content, 'this test errored')
 
1778
 
 
1779
    def test_skip_has_no_log(self):
 
1780
        result = self._run_test('test_skip')
 
1781
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
1782
        skips = result.skip_reasons['reason']
 
1783
        self.assertEqual(1, len(skips))
 
1784
        test = skips[0]
 
1785
        self.assertFalse('log' in test.getDetails())
 
1786
 
 
1787
    def test_missing_feature_has_no_log(self):
 
1788
        # testtools doesn't know about addNotSupported, so it just gets
 
1789
        # considered as a skip
 
1790
        result = self._run_test('test_missing_feature')
 
1791
        self.assertEqual([missing_feature], result.skip_reasons.keys())
 
1792
        skips = result.skip_reasons[missing_feature]
 
1793
        self.assertEqual(1, len(skips))
 
1794
        test = skips[0]
 
1795
        self.assertFalse('log' in test.getDetails())
 
1796
 
 
1797
    def test_xfail_has_no_log(self):
 
1798
        result = self._run_test('test_xfail')
 
1799
        self.assertEqual(1, len(result.expectedFailures))
 
1800
        result_content = result.expectedFailures[0][1]
 
1801
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
1802
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
1803
 
 
1804
    def test_unexpected_success_has_log(self):
 
1805
        result = self._run_test('test_unexpected_success')
 
1806
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
1807
        # Inconsistency, unexpectedSuccesses is a list of tests,
 
1808
        # expectedFailures is a list of reasons?
 
1809
        test = result.unexpectedSuccesses[0]
 
1810
        details = test.getDetails()
 
1811
        self.assertTrue('log' in details)
 
1812
 
 
1813
 
 
1814
class TestTestCloning(tests.TestCase):
 
1815
    """Tests that test cloning of TestCases (as used by multiply_tests)."""
 
1816
 
 
1817
    def test_cloned_testcase_does_not_share_details(self):
 
1818
        """A TestCase cloned with clone_test does not share mutable attributes
 
1819
        such as details or cleanups.
 
1820
        """
 
1821
        class Test(tests.TestCase):
 
1822
            def test_foo(self):
 
1823
                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
 
1824
        orig_test = Test('test_foo')
 
1825
        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
 
1826
        orig_test.run(unittest.TestResult())
 
1827
        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
 
1828
        self.assertEqual(None, cloned_test.getDetails().get('foo'))
 
1829
 
 
1830
    def test_double_apply_scenario_preserves_first_scenario(self):
 
1831
        """Applying two levels of scenarios to a test preserves the attributes
 
1832
        added by both scenarios.
 
1833
        """
 
1834
        class Test(tests.TestCase):
 
1835
            def test_foo(self):
 
1836
                pass
 
1837
        test = Test('test_foo')
 
1838
        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
 
1839
        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
 
1840
        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
 
1841
        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
 
1842
        all_tests = list(tests.iter_suite_tests(suite))
 
1843
        self.assertLength(4, all_tests)
 
1844
        all_xys = sorted((t.x, t.y) for t in all_tests)
 
1845
        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
 
1846
 
 
1847
 
1658
1848
# NB: Don't delete this; it's not actually from 0.11!
1659
1849
@deprecated_function(deprecated_in((0, 11, 0)))
1660
1850
def sample_deprecated_function():
1814
2004
                tree.branch.repository.bzrdir.root_transport)
1815
2005
 
1816
2006
 
1817
 
class SelfTestHelper:
 
2007
class SelfTestHelper(object):
1818
2008
 
1819
2009
    def run_selftest(self, **kwargs):
1820
2010
        """Run selftest returning its output."""
1880
2070
            def __call__(test, result):
1881
2071
                test.run(result)
1882
2072
            def run(test, result):
1883
 
                self.assertIsInstance(result, tests.ForwardingResult)
 
2073
                self.assertIsInstance(result, ExtendedToOriginalDecorator)
1884
2074
                calls.append("called")
1885
2075
            def countTestCases(self):
1886
2076
                return 1
1971
2161
            load_list='missing file name', list_only=True)
1972
2162
 
1973
2163
 
 
2164
class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
 
2165
 
 
2166
    _test_needs_features = [features.subunit]
 
2167
 
 
2168
    def run_subunit_stream(self, test_name):
 
2169
        from subunit import ProtocolTestCase
 
2170
        def factory():
 
2171
            return TestUtil.TestSuite([_get_test(test_name)])
 
2172
        stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
 
2173
            test_suite_factory=factory)
 
2174
        test = ProtocolTestCase(stream)
 
2175
        result = testtools.TestResult()
 
2176
        test.run(result)
 
2177
        content = stream.getvalue()
 
2178
        return content, result
 
2179
 
 
2180
    def test_fail_has_log(self):
 
2181
        content, result = self.run_subunit_stream('test_fail')
 
2182
        self.assertEqual(1, len(result.failures))
 
2183
        self.assertContainsRe(content, '(?m)^log$')
 
2184
        self.assertContainsRe(content, 'this test will fail')
 
2185
 
 
2186
    def test_error_has_log(self):
 
2187
        content, result = self.run_subunit_stream('test_error')
 
2188
        self.assertContainsRe(content, '(?m)^log$')
 
2189
        self.assertContainsRe(content, 'this test errored')
 
2190
 
 
2191
    def test_skip_has_no_log(self):
 
2192
        content, result = self.run_subunit_stream('test_skip')
 
2193
        self.assertNotContainsRe(content, '(?m)^log$')
 
2194
        self.assertNotContainsRe(content, 'this test will be skipped')
 
2195
        self.assertEqual(['reason'], result.skip_reasons.keys())
 
2196
        skips = result.skip_reasons['reason']
 
2197
        self.assertEqual(1, len(skips))
 
2198
        test = skips[0]
 
2199
        # RemotedTestCase doesn't preserve the "details"
 
2200
        ## self.assertFalse('log' in test.getDetails())
 
2201
 
 
2202
    def test_missing_feature_has_no_log(self):
 
2203
        content, result = self.run_subunit_stream('test_missing_feature')
 
2204
        self.assertNotContainsRe(content, '(?m)^log$')
 
2205
        self.assertNotContainsRe(content, 'missing the feature')
 
2206
        self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys())
 
2207
        skips = result.skip_reasons['_MissingFeature\n']
 
2208
        self.assertEqual(1, len(skips))
 
2209
        test = skips[0]
 
2210
        # RemotedTestCase doesn't preserve the "details"
 
2211
        ## self.assertFalse('log' in test.getDetails())
 
2212
 
 
2213
    def test_xfail_has_no_log(self):
 
2214
        content, result = self.run_subunit_stream('test_xfail')
 
2215
        self.assertNotContainsRe(content, '(?m)^log$')
 
2216
        self.assertNotContainsRe(content, 'test with expected failure')
 
2217
        self.assertEqual(1, len(result.expectedFailures))
 
2218
        result_content = result.expectedFailures[0][1]
 
2219
        self.assertNotContainsRe(result_content, 'Text attachment: log')
 
2220
        self.assertNotContainsRe(result_content, 'test with expected failure')
 
2221
 
 
2222
    def test_unexpected_success_has_log(self):
 
2223
        content, result = self.run_subunit_stream('test_unexpected_success')
 
2224
        self.assertContainsRe(content, '(?m)^log$')
 
2225
        self.assertContainsRe(content, 'test with unexpected success')
 
2226
        self.expectFailure('subunit treats "unexpectedSuccess"'
 
2227
                           ' as a plain success',
 
2228
            self.assertEqual, 1, len(result.unexpectedSuccesses))
 
2229
        self.assertEqual(1, len(result.unexpectedSuccesses))
 
2230
        test = result.unexpectedSuccesses[0]
 
2231
        # RemotedTestCase doesn't preserve the "details"
 
2232
        ## self.assertTrue('log' in test.getDetails())
 
2233
 
 
2234
    def test_success_has_no_log(self):
 
2235
        content, result = self.run_subunit_stream('test_success')
 
2236
        self.assertEqual(1, result.testsRun)
 
2237
        self.assertNotContainsRe(content, '(?m)^log$')
 
2238
        self.assertNotContainsRe(content, 'this test succeeds')
 
2239
 
 
2240
 
1974
2241
class TestRunBzr(tests.TestCase):
1975
2242
 
1976
2243
    out = ''
2339
2606
            os.chdir = orig_chdir
2340
2607
        self.assertEqual(['foo', 'current'], chdirs)
2341
2608
 
 
2609
    def test_get_bzr_path_with_cwd_bzrlib(self):
 
2610
        self.get_source_path = lambda: ""
 
2611
        self.overrideAttr(os.path, "isfile", lambda path: True)
 
2612
        self.assertEqual(self.get_bzr_path(), "bzr")
 
2613
 
2342
2614
 
2343
2615
class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2344
2616
    """Tests that really need to do things with an external bzr."""
2934
3206
        tpr.register('bar', 'bBB.aAA.rRR')
2935
3207
        self.assertEquals('bbb.aaa.rrr', tpr.get('bar'))
2936
3208
        self.assertThat(self.get_log(),
2937
 
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", ELLIPSIS))
 
3209
            DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
 
3210
                           doctest.ELLIPSIS))
2938
3211
 
2939
3212
    def test_get_unknown_prefix(self):
2940
3213
        tpr = self._get_registry()
2960
3233
        self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp'))
2961
3234
 
2962
3235
 
 
3236
class TestThreadLeakDetection(tests.TestCase):
 
3237
    """Ensure when tests leak threads we detect and report it"""
 
3238
 
 
3239
    class LeakRecordingResult(tests.ExtendedTestResult):
 
3240
        def __init__(self):
 
3241
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3242
            self.leaks = []
 
3243
        def _report_thread_leak(self, test, leaks, alive):
 
3244
            self.leaks.append((test, leaks))
 
3245
 
 
3246
    def test_testcase_without_addCleanups(self):
 
3247
        """Check old TestCase instances don't break with leak detection"""
 
3248
        class Test(unittest.TestCase):
 
3249
            def runTest(self):
 
3250
                pass
 
3251
        result = self.LeakRecordingResult()
 
3252
        test = Test()
 
3253
        result.startTestRun()
 
3254
        test.run(result)
 
3255
        result.stopTestRun()
 
3256
        self.assertEqual(result._tests_leaking_threads_count, 0)
 
3257
        self.assertEqual(result.leaks, [])
 
3258
        
 
3259
    def test_thread_leak(self):
 
3260
        """Ensure a thread that outlives the running of a test is reported
 
3261
 
 
3262
        Uses a thread that blocks on an event, and is started by the inner
 
3263
        test case. As the thread outlives the inner case's run, it should be
 
3264
        detected as a leak, but the event is then set so that the thread can
 
3265
        be safely joined in cleanup so it's not leaked for real.
 
3266
        """
 
3267
        event = threading.Event()
 
3268
        thread = threading.Thread(name="Leaker", target=event.wait)
 
3269
        class Test(tests.TestCase):
 
3270
            def test_leak(self):
 
3271
                thread.start()
 
3272
        result = self.LeakRecordingResult()
 
3273
        test = Test("test_leak")
 
3274
        self.addCleanup(thread.join)
 
3275
        self.addCleanup(event.set)
 
3276
        result.startTestRun()
 
3277
        test.run(result)
 
3278
        result.stopTestRun()
 
3279
        self.assertEqual(result._tests_leaking_threads_count, 1)
 
3280
        self.assertEqual(result._first_thread_leaker_id, test.id())
 
3281
        self.assertEqual(result.leaks, [(test, set([thread]))])
 
3282
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3283
 
 
3284
    def test_multiple_leaks(self):
 
3285
        """Check multiple leaks are blamed on the test cases at fault
 
3286
 
 
3287
        Same concept as the previous test, but has one inner test method that
 
3288
        leaks two threads, and one that doesn't leak at all.
 
3289
        """
 
3290
        event = threading.Event()
 
3291
        thread_a = threading.Thread(name="LeakerA", target=event.wait)
 
3292
        thread_b = threading.Thread(name="LeakerB", target=event.wait)
 
3293
        thread_c = threading.Thread(name="LeakerC", target=event.wait)
 
3294
        class Test(tests.TestCase):
 
3295
            def test_first_leak(self):
 
3296
                thread_b.start()
 
3297
            def test_second_no_leak(self):
 
3298
                pass
 
3299
            def test_third_leak(self):
 
3300
                thread_c.start()
 
3301
                thread_a.start()
 
3302
        result = self.LeakRecordingResult()
 
3303
        first_test = Test("test_first_leak")
 
3304
        third_test = Test("test_third_leak")
 
3305
        self.addCleanup(thread_a.join)
 
3306
        self.addCleanup(thread_b.join)
 
3307
        self.addCleanup(thread_c.join)
 
3308
        self.addCleanup(event.set)
 
3309
        result.startTestRun()
 
3310
        unittest.TestSuite(
 
3311
            [first_test, Test("test_second_no_leak"), third_test]
 
3312
            ).run(result)
 
3313
        result.stopTestRun()
 
3314
        self.assertEqual(result._tests_leaking_threads_count, 2)
 
3315
        self.assertEqual(result._first_thread_leaker_id, first_test.id())
 
3316
        self.assertEqual(result.leaks, [
 
3317
            (first_test, set([thread_b])),
 
3318
            (third_test, set([thread_a, thread_c]))])
 
3319
        self.assertContainsString(result.stream.getvalue(), "leaking threads")
 
3320
 
 
3321
 
 
3322
class TestPostMortemDebugging(tests.TestCase):
 
3323
    """Check post mortem debugging works when tests fail or error"""
 
3324
 
 
3325
    class TracebackRecordingResult(tests.ExtendedTestResult):
 
3326
        def __init__(self):
 
3327
            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
 
3328
            self.postcode = None
 
3329
        def _post_mortem(self, tb=None):
 
3330
            """Record the code object at the end of the current traceback"""
 
3331
            tb = tb or sys.exc_info()[2]
 
3332
            if tb is not None:
 
3333
                next = tb.tb_next
 
3334
                while next is not None:
 
3335
                    tb = next
 
3336
                    next = next.tb_next
 
3337
                self.postcode = tb.tb_frame.f_code
 
3338
        def report_error(self, test, err):
 
3339
            pass
 
3340
        def report_failure(self, test, err):
 
3341
            pass
 
3342
 
 
3343
    def test_location_unittest_error(self):
 
3344
        """Needs right post mortem traceback with erroring unittest case"""
 
3345
        class Test(unittest.TestCase):
 
3346
            def runTest(self):
 
3347
                raise RuntimeError
 
3348
        result = self.TracebackRecordingResult()
 
3349
        Test().run(result)
 
3350
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3351
 
 
3352
    def test_location_unittest_failure(self):
 
3353
        """Needs right post mortem traceback with failing unittest case"""
 
3354
        class Test(unittest.TestCase):
 
3355
            def runTest(self):
 
3356
                raise self.failureException
 
3357
        result = self.TracebackRecordingResult()
 
3358
        Test().run(result)
 
3359
        self.assertEqual(result.postcode, Test.runTest.func_code)
 
3360
 
 
3361
    def test_location_bt_error(self):
 
3362
        """Needs right post mortem traceback with erroring bzrlib.tests case"""
 
3363
        class Test(tests.TestCase):
 
3364
            def test_error(self):
 
3365
                raise RuntimeError
 
3366
        result = self.TracebackRecordingResult()
 
3367
        Test("test_error").run(result)
 
3368
        self.assertEqual(result.postcode, Test.test_error.func_code)
 
3369
 
 
3370
    def test_location_bt_failure(self):
 
3371
        """Needs right post mortem traceback with failing bzrlib.tests case"""
 
3372
        class Test(tests.TestCase):
 
3373
            def test_failure(self):
 
3374
                raise self.failureException
 
3375
        result = self.TracebackRecordingResult()
 
3376
        Test("test_failure").run(result)
 
3377
        self.assertEqual(result.postcode, Test.test_failure.func_code)
 
3378
 
 
3379
    def test_env_var_triggers_post_mortem(self):
 
3380
        """Check pdb.post_mortem is called iff BZR_TEST_PDB is set"""
 
3381
        import pdb
 
3382
        result = tests.ExtendedTestResult(StringIO(), 0, 1)
 
3383
        post_mortem_calls = []
 
3384
        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
 
3385
        self.overrideEnv('BZR_TEST_PDB', None)
 
3386
        result._post_mortem(1)
 
3387
        self.overrideEnv('BZR_TEST_PDB', 'on')
 
3388
        result._post_mortem(2)
 
3389
        self.assertEqual([2], post_mortem_calls)
 
3390
 
 
3391
 
2963
3392
class TestRunSuite(tests.TestCase):
2964
3393
 
2965
3394
    def test_runner_class(self):
2976
3405
                                                self.verbosity)
2977
3406
        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
2978
3407
        self.assertLength(1, calls)
 
3408
 
 
3409
 
 
3410
class TestEnvironHandling(tests.TestCase):
 
3411
 
 
3412
    def test_overrideEnv_None_called_twice_doesnt_leak(self):
 
3413
        self.failIf('MYVAR' in os.environ)
 
3414
        self.overrideEnv('MYVAR', '42')
 
3415
        # We use an embedded test to make sure we fix the _captureVar bug
 
3416
        class Test(tests.TestCase):
 
3417
            def test_me(self):
 
3418
                # The first call save the 42 value
 
3419
                self.overrideEnv('MYVAR', None)
 
3420
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3421
                # Make sure we can call it twice
 
3422
                self.overrideEnv('MYVAR', None)
 
3423
                self.assertEquals(None, os.environ.get('MYVAR'))
 
3424
        output = StringIO()
 
3425
        result = tests.TextTestResult(output, 0, 1)
 
3426
        Test('test_me').run(result)
 
3427
        if not result.wasStrictlySuccessful():
 
3428
            self.fail(output.getvalue())
 
3429
        # We get our value back
 
3430
        self.assertEquals('42', os.environ.get('MYVAR'))
 
3431
 
 
3432
 
 
3433
class TestIsolatedEnv(tests.TestCase):
 
3434
    """Test isolating tests from os.environ.
 
3435
 
 
3436
    Since we use tests that are already isolated from os.environ a bit of care
 
3437
    should be taken when designing the tests to avoid bootstrap side-effects.
 
3438
    The tests start an already clean os.environ which allow doing valid
 
3439
    assertions about which variables are present or not and design tests around
 
3440
    these assertions.
 
3441
    """
 
3442
 
 
3443
    class ScratchMonkey(tests.TestCase):
 
3444
 
 
3445
        def test_me(self):
 
3446
            pass
 
3447
 
 
3448
    def test_basics(self):
 
3449
        # Make sure we know the definition of BZR_HOME: not part of os.environ
 
3450
        # for tests.TestCase.
 
3451
        self.assertTrue('BZR_HOME' in tests.isolated_environ)
 
3452
        self.assertEquals(None, tests.isolated_environ['BZR_HOME'])
 
3453
        # Being part of isolated_environ, BZR_HOME should not appear here
 
3454
        self.assertFalse('BZR_HOME' in os.environ)
 
3455
        # Make sure we know the definition of LINES: part of os.environ for
 
3456
        # tests.TestCase
 
3457
        self.assertTrue('LINES' in tests.isolated_environ)
 
3458
        self.assertEquals('25', tests.isolated_environ['LINES'])
 
3459
        self.assertEquals('25', os.environ['LINES'])
 
3460
 
 
3461
    def test_injecting_unknown_variable(self):
 
3462
        # BZR_HOME is known to be absent from os.environ
 
3463
        test = self.ScratchMonkey('test_me')
 
3464
        tests.override_os_environ(test, {'BZR_HOME': 'foo'})
 
3465
        self.assertEquals('foo', os.environ['BZR_HOME'])
 
3466
        tests.restore_os_environ(test)
 
3467
        self.assertFalse('BZR_HOME' in os.environ)
 
3468
 
 
3469
    def test_injecting_known_variable(self):
 
3470
        test = self.ScratchMonkey('test_me')
 
3471
        # LINES is known to be present in os.environ
 
3472
        tests.override_os_environ(test, {'LINES': '42'})
 
3473
        self.assertEquals('42', os.environ['LINES'])
 
3474
        tests.restore_os_environ(test)
 
3475
        self.assertEquals('25', os.environ['LINES'])
 
3476
 
 
3477
    def test_deleting_variable(self):
 
3478
        test = self.ScratchMonkey('test_me')
 
3479
        # LINES is known to be present in os.environ
 
3480
        tests.override_os_environ(test, {'LINES': None})
 
3481
        self.assertTrue('LINES' not in os.environ)
 
3482
        tests.restore_os_environ(test)
 
3483
        self.assertEquals('25', os.environ['LINES'])
 
3484
 
 
3485
 
 
3486
class TestDocTestSuiteIsolation(tests.TestCase):
 
3487
    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
 
3488
 
 
3489
    Since tests.TestCase alreay provides an isolation from os.environ, we use
 
3490
    the clean environment as a base for testing. To precisely capture the
 
3491
    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
 
3492
    compare against.
 
3493
 
 
3494
    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
 
3495
    not `os.environ` so each test overrides it to suit its needs.
 
3496
 
 
3497
    """
 
3498
 
 
3499
    def get_doctest_suite_for_string(self, klass, string):
 
3500
        class Finder(doctest.DocTestFinder):
 
3501
 
 
3502
            def find(*args, **kwargs):
 
3503
                test = doctest.DocTestParser().get_doctest(
 
3504
                    string, {}, 'foo', 'foo.py', 0)
 
3505
                return [test]
 
3506
 
 
3507
        suite = klass(test_finder=Finder())
 
3508
        return suite
 
3509
 
 
3510
    def run_doctest_suite_for_string(self, klass, string):
 
3511
        suite = self.get_doctest_suite_for_string(klass, string)
 
3512
        output = StringIO()
 
3513
        result = tests.TextTestResult(output, 0, 1)
 
3514
        suite.run(result)
 
3515
        return result, output
 
3516
 
 
3517
    def assertDocTestStringSucceds(self, klass, string):
 
3518
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3519
        if not result.wasStrictlySuccessful():
 
3520
            self.fail(output.getvalue())
 
3521
 
 
3522
    def assertDocTestStringFails(self, klass, string):
 
3523
        result, output = self.run_doctest_suite_for_string(klass, string)
 
3524
        if result.wasStrictlySuccessful():
 
3525
            self.fail(output.getvalue())
 
3526
 
 
3527
    def test_injected_variable(self):
 
3528
        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
 
3529
        test = """
 
3530
            >>> import os
 
3531
            >>> os.environ['LINES']
 
3532
            '42'
 
3533
            """
 
3534
        # doctest.DocTestSuite fails as it sees '25'
 
3535
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3536
        # tests.DocTestSuite sees '42'
 
3537
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
 
3538
 
 
3539
    def test_deleted_variable(self):
 
3540
        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
 
3541
        test = """
 
3542
            >>> import os
 
3543
            >>> os.environ.get('LINES')
 
3544
            """
 
3545
        # doctest.DocTestSuite fails as it sees '25'
 
3546
        self.assertDocTestStringFails(doctest.DocTestSuite, test)
 
3547
        # tests.DocTestSuite sees None
 
3548
        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)